0d35d53f9d2b924ea8b583fda5b5a3a682be09fb
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Data::Dumper;
90 use Digest::MD5 qw(md5_hex);
91 use Getopt::Long;
92 use IPC::Open2;
93 use IO::Select;
94 use File::Temp;
95 use Fcntl ':flock';
96 use File::Path qw( make_path remove_tree );
97
98 use constant EX_TEMPFAIL => 75;
99
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103   if ($ENV{"USER"} ne "crunch" && $< != 0) {
104     # use a tmp dir unique for my uid
105     $ENV{"CRUNCH_TMP"} .= "-$<";
106   }
107 }
108
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
112 }
113
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
118
119 my $force_unlock;
120 my $git_dir;
121 my $jobspec;
122 my $job_api_token;
123 my $no_clear_tmp;
124 my $resume_stash;
125 GetOptions('force-unlock' => \$force_unlock,
126            'git-dir=s' => \$git_dir,
127            'job=s' => \$jobspec,
128            'job-api-token=s' => \$job_api_token,
129            'no-clear-tmp' => \$no_clear_tmp,
130            'resume-stash=s' => \$resume_stash,
131     );
132
133 if (defined $job_api_token) {
134   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
135 }
136
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
138 my $local_job = 0;
139
140
141 $SIG{'USR1'} = sub
142 {
143   $main::ENV{CRUNCH_DEBUG} = 1;
144 };
145 $SIG{'USR2'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 0;
148 };
149
150
151
152 my $arv = Arvados->new('apiVersion' => 'v1');
153
154 my $Job;
155 my $job_id;
156 my $dbh;
157 my $sth;
158 my @jobstep;
159
160 my $User = api_call("users/current");
161
162 if ($jobspec =~ /^[-a-z\d]+$/)
163 {
164   # $jobspec is an Arvados UUID, not a JSON job specification
165   $Job = api_call("jobs/get", uuid => $jobspec);
166   if (!$force_unlock) {
167     # Claim this job, and make sure nobody else does
168     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = api_call("jobs/create", job => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
306 my $squeue_checked;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
310
311
312
313 if (defined $Job->{thawedfromkey})
314 {
315   thaw ($Job->{thawedfromkey});
316 }
317 else
318 {
319   my $first_task = api_call("job_tasks/create", job_task => {
320     'job_uuid' => $Job->{'uuid'},
321     'sequence' => 0,
322     'qsequence' => 0,
323     'parameters' => {},
324   });
325   push @jobstep, { 'level' => 0,
326                    'failures' => 0,
327                    'arvados_task' => $first_task,
328                  };
329   push @jobstep_todo, 0;
330 }
331
332
333 if (!$have_slurm)
334 {
335   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
336 }
337
338
339 my $build_script;
340 do {
341   local $/ = undef;
342   $build_script = <DATA>;
343 };
344 my $nodelist = join(",", @node);
345
346 if (!defined $no_clear_tmp) {
347   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
348   Log (undef, "Clean work dirs");
349
350   my $cleanpid = fork();
351   if ($cleanpid == 0)
352   {
353     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
354           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
355     exit (1);
356   }
357   while (1)
358   {
359     last if $cleanpid == waitpid (-1, WNOHANG);
360     freeze_if_want_freeze ($cleanpid);
361     select (undef, undef, undef, 0.1);
362   }
363   Log (undef, "Cleanup command exited ".exit_status_s($?));
364 }
365
366
367 my $git_archive;
368 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
369   # If script_version looks like an absolute path, *and* the --git-dir
370   # argument was not given -- which implies we were not invoked by
371   # crunch-dispatch -- we will use the given path as a working
372   # directory instead of resolving script_version to a git commit (or
373   # doing anything else with git).
374   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
375   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
376 }
377 else {
378   # Resolve the given script_version to a git commit sha1. Also, if
379   # the repository is remote, clone it into our local filesystem: this
380   # ensures "git archive" will work, and is necessary to reliably
381   # resolve a symbolic script_version like "master^".
382   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
383
384   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
385
386   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
387
388   # If we're running under crunch-dispatch, it will have already
389   # pulled the appropriate source tree into its own repository, and
390   # given us that repo's path as $git_dir.
391   #
392   # If we're running a "local" job, we might have to fetch content
393   # from a remote repository.
394   #
395   # (Currently crunch-dispatch gives a local path with --git-dir, but
396   # we might as well accept URLs there too in case it changes its
397   # mind.)
398   my $repo = $git_dir || $Job->{'repository'};
399
400   # Repository can be remote or local. If remote, we'll need to fetch it
401   # to a local dir before doing `git log` et al.
402   my $repo_location;
403
404   if ($repo =~ m{://|^[^/]*:}) {
405     # $repo is a git url we can clone, like git:// or https:// or
406     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
407     # not recognized here because distinguishing that from a local
408     # path is too fragile. If you really need something strange here,
409     # use the ssh:// form.
410     $repo_location = 'remote';
411   } elsif ($repo =~ m{^\.*/}) {
412     # $repo is a local path to a git index. We'll also resolve ../foo
413     # to ../foo/.git if the latter is a directory. To help
414     # disambiguate local paths from named hosted repositories, this
415     # form must be given as ./ or ../ if it's a relative path.
416     if (-d "$repo/.git") {
417       $repo = "$repo/.git";
418     }
419     $repo_location = 'local';
420   } else {
421     # $repo is none of the above. It must be the name of a hosted
422     # repository.
423     my $arv_repo_list = api_call("repositories/list",
424                                  'filters' => [['name','=',$repo]]);
425     my @repos_found = @{$arv_repo_list->{'items'}};
426     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
427     if ($n_found > 0) {
428       Log(undef, "Repository '$repo' -> "
429           . join(", ", map { $_->{'uuid'} } @repos_found));
430     }
431     if ($n_found != 1) {
432       croak("Error: Found $n_found repositories with name '$repo'.");
433     }
434     $repo = $repos_found[0]->{'fetch_url'};
435     $repo_location = 'remote';
436   }
437   Log(undef, "Using $repo_location repository '$repo'");
438   $ENV{"CRUNCH_SRC_URL"} = $repo;
439
440   # Resolve given script_version (we'll call that $treeish here) to a
441   # commit sha1 ($commit).
442   my $treeish = $Job->{'script_version'};
443   my $commit;
444   if ($repo_location eq 'remote') {
445     # We minimize excess object-fetching by re-using the same bare
446     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
447     # just keep adding remotes to it as needed.
448     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
449     my $gitcmd = "git --git-dir=\Q$local_repo\E";
450
451     # Set up our local repo for caching remote objects, making
452     # archives, etc.
453     if (!-d $local_repo) {
454       make_path($local_repo) or croak("Error: could not create $local_repo");
455     }
456     # This works (exits 0 and doesn't delete fetched objects) even
457     # if $local_repo is already initialized:
458     `$gitcmd init --bare`;
459     if ($?) {
460       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
461     }
462
463     # If $treeish looks like a hash (or abbrev hash) we look it up in
464     # our local cache first, since that's cheaper. (We don't want to
465     # do that with tags/branches though -- those change over time, so
466     # they should always be resolved by the remote repo.)
467     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
468       # Hide stderr because it's normal for this to fail:
469       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
470       if ($? == 0 &&
471           # Careful not to resolve a branch named abcdeff to commit 1234567:
472           $sha1 =~ /^$treeish/ &&
473           $sha1 =~ /^([0-9a-f]{40})$/s) {
474         $commit = $1;
475         Log(undef, "Commit $commit already present in $local_repo");
476       }
477     }
478
479     if (!defined $commit) {
480       # If $treeish isn't just a hash or abbrev hash, or isn't here
481       # yet, we need to fetch the remote to resolve it correctly.
482
483       # First, remove all local heads. This prevents a name that does
484       # not exist on the remote from resolving to (or colliding with)
485       # a previously fetched branch or tag (possibly from a different
486       # remote).
487       remove_tree("$local_repo/refs/heads", {keep_root => 1});
488
489       Log(undef, "Fetching objects from $repo to $local_repo");
490       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
491       if ($?) {
492         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
493       }
494     }
495
496     # Now that the data is all here, we will use our local repo for
497     # the rest of our git activities.
498     $repo = $local_repo;
499   }
500
501   my $gitcmd = "git --git-dir=\Q$repo\E";
502   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
503   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
504     croak("`$gitcmd rev-list` exited "
505           .exit_status_s($?)
506           .", '$treeish' not found. Giving up.");
507   }
508   $commit = $1;
509   Log(undef, "Version $treeish is commit $commit");
510
511   if ($commit ne $Job->{'script_version'}) {
512     # Record the real commit id in the database, frozentokey, logs,
513     # etc. -- instead of an abbreviation or a branch name which can
514     # become ambiguous or point to a different commit in the future.
515     if (!$Job->update_attributes('script_version' => $commit)) {
516       croak("Error: failed to update job's script_version attribute");
517     }
518   }
519
520   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
521   $git_archive = `$gitcmd archive ''\Q$commit\E`;
522   if ($?) {
523     croak("Error: $gitcmd archive exited ".exit_status_s($?));
524   }
525 }
526
527 if (!defined $git_archive) {
528   Log(undef, "Skip install phase (no git archive)");
529   if ($have_slurm) {
530     Log(undef, "Warning: This probably means workers have no source tree!");
531   }
532 }
533 else {
534   Log(undef, "Run install script on all workers");
535
536   my @srunargs = ("srun",
537                   "--nodelist=$nodelist",
538                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
539   my @execargs = ("sh", "-c",
540                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
541
542   my $installpid = fork();
543   if ($installpid == 0)
544   {
545     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
546     exit (1);
547   }
548   while (1)
549   {
550     last if $installpid == waitpid (-1, WNOHANG);
551     freeze_if_want_freeze ($installpid);
552     select (undef, undef, undef, 0.1);
553   }
554   my $install_exited = $?;
555   Log (undef, "Install script exited ".exit_status_s($install_exited));
556   exit (1) if $install_exited != 0;
557 }
558
559 if (!$have_slurm)
560 {
561   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
562   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
563 }
564
565 # If this job requires a Docker image, install that.
566 my $docker_bin = "/usr/bin/docker.io";
567 my ($docker_locator, $docker_stream, $docker_hash);
568 if ($docker_locator = $Job->{docker_image_locator}) {
569   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
570   if (!$docker_hash)
571   {
572     croak("No Docker image hash found from locator $docker_locator");
573   }
574   $docker_stream =~ s/^\.//;
575   my $docker_install_script = qq{
576 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
577     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
578 fi
579 };
580   my $docker_pid = fork();
581   if ($docker_pid == 0)
582   {
583     srun (["srun", "--nodelist=" . join(',', @node)],
584           ["/bin/sh", "-ec", $docker_install_script]);
585     exit ($?);
586   }
587   while (1)
588   {
589     last if $docker_pid == waitpid (-1, WNOHANG);
590     freeze_if_want_freeze ($docker_pid);
591     select (undef, undef, undef, 0.1);
592   }
593   if ($? != 0)
594   {
595     croak("Installing Docker image from $docker_locator exited "
596           .exit_status_s($?));
597   }
598 }
599
600 foreach (qw (script script_version script_parameters runtime_constraints))
601 {
602   Log (undef,
603        "$_ " .
604        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
605 }
606 foreach (split (/\n/, $Job->{knobs}))
607 {
608   Log (undef, "knob " . $_);
609 }
610
611
612
613 $main::success = undef;
614
615
616
617 ONELEVEL:
618
619 my $thisround_succeeded = 0;
620 my $thisround_failed = 0;
621 my $thisround_failed_multiple = 0;
622
623 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
624                        or $a <=> $b } @jobstep_todo;
625 my $level = $jobstep[$jobstep_todo[0]]->{level};
626 Log (undef, "start level $level");
627
628
629
630 my %proc;
631 my @freeslot = (0..$#slot);
632 my @holdslot;
633 my %reader;
634 my $progress_is_dirty = 1;
635 my $progress_stats_updated = 0;
636
637 update_progress_stats();
638
639
640
641 THISROUND:
642 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
643 {
644   my $id = $jobstep_todo[$todo_ptr];
645   my $Jobstep = $jobstep[$id];
646   if ($Jobstep->{level} != $level)
647   {
648     next;
649   }
650
651   pipe $reader{$id}, "writer" or croak ($!);
652   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
653   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
654
655   my $childslot = $freeslot[0];
656   my $childnode = $slot[$childslot]->{node};
657   my $childslotname = join (".",
658                             $slot[$childslot]->{node}->{name},
659                             $slot[$childslot]->{cpu});
660   my $childpid = fork();
661   if ($childpid == 0)
662   {
663     $SIG{'INT'} = 'DEFAULT';
664     $SIG{'QUIT'} = 'DEFAULT';
665     $SIG{'TERM'} = 'DEFAULT';
666
667     foreach (values (%reader))
668     {
669       close($_);
670     }
671     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
672     open(STDOUT,">&writer");
673     open(STDERR,">&writer");
674
675     undef $dbh;
676     undef $sth;
677
678     delete $ENV{"GNUPGHOME"};
679     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
680     $ENV{"TASK_QSEQUENCE"} = $id;
681     $ENV{"TASK_SEQUENCE"} = $level;
682     $ENV{"JOB_SCRIPT"} = $Job->{script};
683     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
684       $param =~ tr/a-z/A-Z/;
685       $ENV{"JOB_PARAMETER_$param"} = $value;
686     }
687     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
688     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
689     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
690     $ENV{"HOME"} = $ENV{"TASK_WORK"};
691     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
692     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
693     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
694     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
695
696     $ENV{"GZIP"} = "-n";
697
698     my @srunargs = (
699       "srun",
700       "--nodelist=".$childnode->{name},
701       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
702       "--job-name=$job_id.$id.$$",
703         );
704     my $command =
705         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
706         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
707         ."&& cd $ENV{CRUNCH_TMP} ";
708     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
709     if ($docker_hash)
710     {
711       my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
712       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
713       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
714
715       # Dynamically configure the container to use the host system as its
716       # DNS server.  Get the host's global addresses from the ip command,
717       # and turn them into docker --dns options using gawk.
718       $command .=
719           q{$(ip -o address show scope global |
720               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
721
722       # The source tree and $destdir directory (which we have
723       # installed on the worker host) are available in the container,
724       # under the same path.
725       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
726       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
727
728       # Currently, we make arv-mount's mount point appear at /keep
729       # inside the container (instead of using the same path as the
730       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
731       # crunch scripts and utilities must not rely on this. They must
732       # use $TASK_KEEPMOUNT.
733       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
734       $ENV{TASK_KEEPMOUNT} = "/keep";
735
736       # TASK_WORK is almost exactly like a docker data volume: it
737       # starts out empty, is writable, and persists until no
738       # containers use it any more. We don't use --volumes-from to
739       # share it with other containers: it is only accessible to this
740       # task, and it goes away when this task stops.
741       #
742       # However, a docker data volume is writable only by root unless
743       # the mount point already happens to exist in the container with
744       # different permissions. Therefore, we [1] assume /tmp already
745       # exists in the image and is writable by the crunch user; [2]
746       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
747       # writable if they are created by docker while setting up the
748       # other --volumes); and [3] create $TASK_WORK inside the
749       # container using $build_script.
750       $command .= "--volume=/tmp ";
751       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
752       $ENV{"HOME"} = $ENV{"TASK_WORK"};
753       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
754
755       # TODO: Share a single JOB_WORK volume across all task
756       # containers on a given worker node, and delete it when the job
757       # ends (and, in case that doesn't work, when the next job
758       # starts).
759       #
760       # For now, use the same approach as TASK_WORK above.
761       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
762
763       while (my ($env_key, $env_val) = each %ENV)
764       {
765         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
766           $command .= "--env=\Q$env_key=$env_val\E ";
767         }
768       }
769       $command .= "--env=\QHOME=$ENV{HOME}\E ";
770       $command .= "\Q$docker_hash\E ";
771       $command .= "stdbuf --output=0 --error=0 ";
772       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
773     } else {
774       # Non-docker run
775       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
776       $command .= "stdbuf --output=0 --error=0 ";
777       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
778     }
779
780     my @execargs = ('bash', '-c', $command);
781     srun (\@srunargs, \@execargs, undef, $build_script);
782     # exec() failed, we assume nothing happened.
783     die "srun() failed on build script\n";
784   }
785   close("writer");
786   if (!defined $childpid)
787   {
788     close $reader{$id};
789     delete $reader{$id};
790     next;
791   }
792   shift @freeslot;
793   $proc{$childpid} = { jobstep => $id,
794                        time => time,
795                        slot => $childslot,
796                        jobstepname => "$job_id.$id.$childpid",
797                      };
798   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
799   $slot[$childslot]->{pid} = $childpid;
800
801   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
802   Log ($id, "child $childpid started on $childslotname");
803   $Jobstep->{starttime} = time;
804   $Jobstep->{node} = $childnode->{name};
805   $Jobstep->{slotindex} = $childslot;
806   delete $Jobstep->{stderr};
807   delete $Jobstep->{finishtime};
808
809   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
810   $Jobstep->{'arvados_task'}->save;
811
812   splice @jobstep_todo, $todo_ptr, 1;
813   --$todo_ptr;
814
815   $progress_is_dirty = 1;
816
817   while (!@freeslot
818          ||
819          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
820   {
821     last THISROUND if $main::please_freeze;
822     if ($main::please_info)
823     {
824       $main::please_info = 0;
825       freeze();
826       collate_output();
827       save_meta(1);
828       update_progress_stats();
829     }
830     my $gotsome
831         = readfrompipes ()
832         + reapchildren ();
833     if (!$gotsome)
834     {
835       check_refresh_wanted();
836       check_squeue();
837       update_progress_stats();
838       select (undef, undef, undef, 0.1);
839     }
840     elsif (time - $progress_stats_updated >= 30)
841     {
842       update_progress_stats();
843     }
844     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
845         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
846     {
847       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
848           .($thisround_failed+$thisround_succeeded)
849           .") -- giving up on this round";
850       Log (undef, $message);
851       last THISROUND;
852     }
853
854     # move slots from freeslot to holdslot (or back to freeslot) if necessary
855     for (my $i=$#freeslot; $i>=0; $i--) {
856       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
857         push @holdslot, (splice @freeslot, $i, 1);
858       }
859     }
860     for (my $i=$#holdslot; $i>=0; $i--) {
861       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
862         push @freeslot, (splice @holdslot, $i, 1);
863       }
864     }
865
866     # give up if no nodes are succeeding
867     if (!grep { $_->{node}->{losing_streak} == 0 &&
868                     $_->{node}->{hold_count} < 4 } @slot) {
869       my $message = "Every node has failed -- giving up on this round";
870       Log (undef, $message);
871       last THISROUND;
872     }
873   }
874 }
875
876
877 push @freeslot, splice @holdslot;
878 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
879
880
881 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
882 while (%proc)
883 {
884   if ($main::please_continue) {
885     $main::please_continue = 0;
886     goto THISROUND;
887   }
888   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
889   readfrompipes ();
890   if (!reapchildren())
891   {
892     check_refresh_wanted();
893     check_squeue();
894     update_progress_stats();
895     select (undef, undef, undef, 0.1);
896     killem (keys %proc) if $main::please_freeze;
897   }
898 }
899
900 update_progress_stats();
901 freeze_if_want_freeze();
902
903
904 if (!defined $main::success)
905 {
906   if (@jobstep_todo &&
907       $thisround_succeeded == 0 &&
908       ($thisround_failed == 0 || $thisround_failed > 4))
909   {
910     my $message = "stop because $thisround_failed tasks failed and none succeeded";
911     Log (undef, $message);
912     $main::success = 0;
913   }
914   if (!@jobstep_todo)
915   {
916     $main::success = 1;
917   }
918 }
919
920 goto ONELEVEL if !defined $main::success;
921
922
923 release_allocation();
924 freeze();
925 my $collated_output = &collate_output();
926
927 if (!$collated_output) {
928   Log(undef, "output undef");
929 }
930 else {
931   eval {
932     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
933         or die "failed to get collated manifest: $!";
934     my $orig_manifest_text = '';
935     while (my $manifest_line = <$orig_manifest>) {
936       $orig_manifest_text .= $manifest_line;
937     }
938     my $output = api_call("collections/create", collection => {
939       'manifest_text' => $orig_manifest_text});
940     Log(undef, "output uuid " . $output->{uuid});
941     Log(undef, "output hash " . $output->{portable_data_hash});
942     $Job->update_attributes('output' => $output->{portable_data_hash});
943   };
944   if ($@) {
945     Log (undef, "Failed to register output manifest: $@");
946   }
947 }
948
949 Log (undef, "finish");
950
951 save_meta();
952
953 my $final_state;
954 if ($collated_output && $main::success) {
955   $final_state = 'Complete';
956 } else {
957   $final_state = 'Failed';
958 }
959 $Job->update_attributes('state' => $final_state);
960
961 exit (($final_state eq 'Complete') ? 0 : 1);
962
963
964
965 sub update_progress_stats
966 {
967   $progress_stats_updated = time;
968   return if !$progress_is_dirty;
969   my ($todo, $done, $running) = (scalar @jobstep_todo,
970                                  scalar @jobstep_done,
971                                  scalar @slot - scalar @freeslot - scalar @holdslot);
972   $Job->{'tasks_summary'} ||= {};
973   $Job->{'tasks_summary'}->{'todo'} = $todo;
974   $Job->{'tasks_summary'}->{'done'} = $done;
975   $Job->{'tasks_summary'}->{'running'} = $running;
976   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
977   Log (undef, "status: $done done, $running running, $todo todo");
978   $progress_is_dirty = 0;
979 }
980
981
982
983 sub reapchildren
984 {
985   my $pid = waitpid (-1, WNOHANG);
986   return 0 if $pid <= 0;
987
988   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
989                   . "."
990                   . $slot[$proc{$pid}->{slot}]->{cpu});
991   my $jobstepid = $proc{$pid}->{jobstep};
992   my $elapsed = time - $proc{$pid}->{time};
993   my $Jobstep = $jobstep[$jobstepid];
994
995   my $childstatus = $?;
996   my $exitvalue = $childstatus >> 8;
997   my $exitinfo = "exit ".exit_status_s($childstatus);
998   $Jobstep->{'arvados_task'}->reload;
999   my $task_success = $Jobstep->{'arvados_task'}->{success};
1000
1001   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1002
1003   if (!defined $task_success) {
1004     # task did not indicate one way or the other --> fail
1005     $Jobstep->{'arvados_task'}->{success} = 0;
1006     $Jobstep->{'arvados_task'}->save;
1007     $task_success = 0;
1008   }
1009
1010   if (!$task_success)
1011   {
1012     my $temporary_fail;
1013     $temporary_fail ||= $Jobstep->{node_fail};
1014     $temporary_fail ||= ($exitvalue == 111);
1015
1016     ++$thisround_failed;
1017     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1018
1019     # Check for signs of a failed or misconfigured node
1020     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1021         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1022       # Don't count this against jobstep failure thresholds if this
1023       # node is already suspected faulty and srun exited quickly
1024       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1025           $elapsed < 5) {
1026         Log ($jobstepid, "blaming failure on suspect node " .
1027              $slot[$proc{$pid}->{slot}]->{node}->{name});
1028         $temporary_fail ||= 1;
1029       }
1030       ban_node_by_slot($proc{$pid}->{slot});
1031     }
1032
1033     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1034                              ++$Jobstep->{'failures'},
1035                              $temporary_fail ? 'temporary ' : 'permanent',
1036                              $elapsed));
1037
1038     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1039       # Give up on this task, and the whole job
1040       $main::success = 0;
1041       $main::please_freeze = 1;
1042     }
1043     # Put this task back on the todo queue
1044     push @jobstep_todo, $jobstepid;
1045     $Job->{'tasks_summary'}->{'failed'}++;
1046   }
1047   else
1048   {
1049     ++$thisround_succeeded;
1050     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1051     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1052     push @jobstep_done, $jobstepid;
1053     Log ($jobstepid, "success in $elapsed seconds");
1054   }
1055   $Jobstep->{exitcode} = $childstatus;
1056   $Jobstep->{finishtime} = time;
1057   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1058   $Jobstep->{'arvados_task'}->save;
1059   process_stderr ($jobstepid, $task_success);
1060   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1061
1062   close $reader{$jobstepid};
1063   delete $reader{$jobstepid};
1064   delete $slot[$proc{$pid}->{slot}]->{pid};
1065   push @freeslot, $proc{$pid}->{slot};
1066   delete $proc{$pid};
1067
1068   if ($task_success) {
1069     # Load new tasks
1070     my $newtask_list = [];
1071     my $newtask_results;
1072     do {
1073       $newtask_results = api_call(
1074         "job_tasks/list",
1075         'where' => {
1076           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1077         },
1078         'order' => 'qsequence',
1079         'offset' => scalar(@$newtask_list),
1080       );
1081       push(@$newtask_list, @{$newtask_results->{items}});
1082     } while (@{$newtask_results->{items}});
1083     foreach my $arvados_task (@$newtask_list) {
1084       my $jobstep = {
1085         'level' => $arvados_task->{'sequence'},
1086         'failures' => 0,
1087         'arvados_task' => $arvados_task
1088       };
1089       push @jobstep, $jobstep;
1090       push @jobstep_todo, $#jobstep;
1091     }
1092   }
1093
1094   $progress_is_dirty = 1;
1095   1;
1096 }
1097
1098 sub check_refresh_wanted
1099 {
1100   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1101   if (@stat && $stat[9] > $latest_refresh) {
1102     $latest_refresh = scalar time;
1103     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1104     for my $attr ('cancelled_at',
1105                   'cancelled_by_user_uuid',
1106                   'cancelled_by_client_uuid',
1107                   'state') {
1108       $Job->{$attr} = $Job2->{$attr};
1109     }
1110     if ($Job->{'state'} ne "Running") {
1111       if ($Job->{'state'} eq "Cancelled") {
1112         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1113       } else {
1114         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1115       }
1116       $main::success = 0;
1117       $main::please_freeze = 1;
1118     }
1119   }
1120 }
1121
1122 sub check_squeue
1123 {
1124   # return if the kill list was checked <4 seconds ago
1125   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1126   {
1127     return;
1128   }
1129   $squeue_kill_checked = time;
1130
1131   # use killem() on procs whose killtime is reached
1132   for (keys %proc)
1133   {
1134     if (exists $proc{$_}->{killtime}
1135         && $proc{$_}->{killtime} <= time)
1136     {
1137       killem ($_);
1138     }
1139   }
1140
1141   # return if the squeue was checked <60 seconds ago
1142   if (defined $squeue_checked && $squeue_checked > time - 60)
1143   {
1144     return;
1145   }
1146   $squeue_checked = time;
1147
1148   if (!$have_slurm)
1149   {
1150     # here is an opportunity to check for mysterious problems with local procs
1151     return;
1152   }
1153
1154   # get a list of steps still running
1155   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1156   chop @squeue;
1157   if ($squeue[-1] ne "ok")
1158   {
1159     return;
1160   }
1161   pop @squeue;
1162
1163   # which of my jobsteps are running, according to squeue?
1164   my %ok;
1165   foreach (@squeue)
1166   {
1167     if (/^(\d+)\.(\d+) (\S+)/)
1168     {
1169       if ($1 eq $ENV{SLURM_JOBID})
1170       {
1171         $ok{$3} = 1;
1172       }
1173     }
1174   }
1175
1176   # which of my active child procs (>60s old) were not mentioned by squeue?
1177   foreach (keys %proc)
1178   {
1179     if ($proc{$_}->{time} < time - 60
1180         && !exists $ok{$proc{$_}->{jobstepname}}
1181         && !exists $proc{$_}->{killtime})
1182     {
1183       # kill this proc if it hasn't exited in 30 seconds
1184       $proc{$_}->{killtime} = time + 30;
1185     }
1186   }
1187 }
1188
1189
1190 sub release_allocation
1191 {
1192   if ($have_slurm)
1193   {
1194     Log (undef, "release job allocation");
1195     system "scancel $ENV{SLURM_JOBID}";
1196   }
1197 }
1198
1199
1200 sub readfrompipes
1201 {
1202   my $gotsome = 0;
1203   foreach my $job (keys %reader)
1204   {
1205     my $buf;
1206     while (0 < sysread ($reader{$job}, $buf, 8192))
1207     {
1208       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1209       $jobstep[$job]->{stderr} .= $buf;
1210       preprocess_stderr ($job);
1211       if (length ($jobstep[$job]->{stderr}) > 16384)
1212       {
1213         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1214       }
1215       $gotsome = 1;
1216     }
1217   }
1218   return $gotsome;
1219 }
1220
1221
1222 sub preprocess_stderr
1223 {
1224   my $job = shift;
1225
1226   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1227     my $line = $1;
1228     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1229     Log ($job, "stderr $line");
1230     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1231       # whoa.
1232       $main::please_freeze = 1;
1233     }
1234     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1235       $jobstep[$job]->{node_fail} = 1;
1236       ban_node_by_slot($jobstep[$job]->{slotindex});
1237     }
1238   }
1239 }
1240
1241
1242 sub process_stderr
1243 {
1244   my $job = shift;
1245   my $task_success = shift;
1246   preprocess_stderr ($job);
1247
1248   map {
1249     Log ($job, "stderr $_");
1250   } split ("\n", $jobstep[$job]->{stderr});
1251 }
1252
1253 sub fetch_block
1254 {
1255   my $hash = shift;
1256   my ($keep, $child_out, $output_block);
1257
1258   my $cmd = "arv-get \Q$hash\E";
1259   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1260   $output_block = '';
1261   while (1) {
1262     my $buf;
1263     my $bytes = sysread($keep, $buf, 1024 * 1024);
1264     if (!defined $bytes) {
1265       die "reading from arv-get: $!";
1266     } elsif ($bytes == 0) {
1267       # sysread returns 0 at the end of the pipe.
1268       last;
1269     } else {
1270       # some bytes were read into buf.
1271       $output_block .= $buf;
1272     }
1273   }
1274   close $keep;
1275   return $output_block;
1276 }
1277
1278 sub collate_output
1279 {
1280   Log (undef, "collate");
1281
1282   my ($child_out, $child_in);
1283   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1284                   '--retries', retry_count());
1285   my $joboutput;
1286   for (@jobstep)
1287   {
1288     next if (!exists $_->{'arvados_task'}->{'output'} ||
1289              !$_->{'arvados_task'}->{'success'});
1290     my $output = $_->{'arvados_task'}->{output};
1291     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1292     {
1293       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1294       print $child_in $output;
1295     }
1296     elsif (@jobstep == 1)
1297     {
1298       $joboutput = $output;
1299       last;
1300     }
1301     elsif (defined (my $outblock = fetch_block ($output)))
1302     {
1303       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1304       print $child_in $outblock;
1305     }
1306     else
1307     {
1308       Log (undef, "XXX fetch_block($output) failed XXX");
1309       $main::success = 0;
1310     }
1311   }
1312   $child_in->close;
1313
1314   if (!defined $joboutput) {
1315     my $s = IO::Select->new($child_out);
1316     if ($s->can_read(120)) {
1317       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1318       chomp($joboutput);
1319       # TODO: Ensure exit status == 0.
1320     } else {
1321       Log (undef, "timed out reading from 'arv-put'");
1322     }
1323   }
1324   # TODO: kill $pid instead of waiting, now that we've decided to
1325   # ignore further output.
1326   waitpid($pid, 0);
1327
1328   return $joboutput;
1329 }
1330
1331
1332 sub killem
1333 {
1334   foreach (@_)
1335   {
1336     my $sig = 2;                # SIGINT first
1337     if (exists $proc{$_}->{"sent_$sig"} &&
1338         time - $proc{$_}->{"sent_$sig"} > 4)
1339     {
1340       $sig = 15;                # SIGTERM if SIGINT doesn't work
1341     }
1342     if (exists $proc{$_}->{"sent_$sig"} &&
1343         time - $proc{$_}->{"sent_$sig"} > 4)
1344     {
1345       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1346     }
1347     if (!exists $proc{$_}->{"sent_$sig"})
1348     {
1349       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1350       kill $sig, $_;
1351       select (undef, undef, undef, 0.1);
1352       if ($sig == 2)
1353       {
1354         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1355       }
1356       $proc{$_}->{"sent_$sig"} = time;
1357       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1358     }
1359   }
1360 }
1361
1362
1363 sub fhbits
1364 {
1365   my($bits);
1366   for (@_) {
1367     vec($bits,fileno($_),1) = 1;
1368   }
1369   $bits;
1370 }
1371
1372
1373 # Send log output to Keep via arv-put.
1374 #
1375 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1376 # $log_pipe_pid is the pid of the arv-put subprocess.
1377 #
1378 # The only functions that should access these variables directly are:
1379 #
1380 # log_writer_start($logfilename)
1381 #     Starts an arv-put pipe, reading data on stdin and writing it to
1382 #     a $logfilename file in an output collection.
1383 #
1384 # log_writer_send($txt)
1385 #     Writes $txt to the output log collection.
1386 #
1387 # log_writer_finish()
1388 #     Closes the arv-put pipe and returns the output that it produces.
1389 #
1390 # log_writer_is_active()
1391 #     Returns a true value if there is currently a live arv-put
1392 #     process, false otherwise.
1393 #
1394 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1395
1396 sub log_writer_start($)
1397 {
1398   my $logfilename = shift;
1399   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1400                         'arv-put', '--portable-data-hash',
1401                         '--retries', '3',
1402                         '--filename', $logfilename,
1403                         '-');
1404 }
1405
1406 sub log_writer_send($)
1407 {
1408   my $txt = shift;
1409   print $log_pipe_in $txt;
1410 }
1411
1412 sub log_writer_finish()
1413 {
1414   return unless $log_pipe_pid;
1415
1416   close($log_pipe_in);
1417   my $arv_put_output;
1418
1419   my $s = IO::Select->new($log_pipe_out);
1420   if ($s->can_read(120)) {
1421     sysread($log_pipe_out, $arv_put_output, 1024);
1422     chomp($arv_put_output);
1423   } else {
1424     Log (undef, "timed out reading from 'arv-put'");
1425   }
1426
1427   waitpid($log_pipe_pid, 0);
1428   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1429   if ($?) {
1430     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1431   }
1432
1433   return $arv_put_output;
1434 }
1435
1436 sub log_writer_is_active() {
1437   return $log_pipe_pid;
1438 }
1439
1440 sub Log                         # ($jobstep_id, $logmessage)
1441 {
1442   if ($_[1] =~ /\n/) {
1443     for my $line (split (/\n/, $_[1])) {
1444       Log ($_[0], $line);
1445     }
1446     return;
1447   }
1448   my $fh = select STDERR; $|=1; select $fh;
1449   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1450   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1451   $message .= "\n";
1452   my $datetime;
1453   if (log_writer_is_active() || -t STDERR) {
1454     my @gmtime = gmtime;
1455     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1456                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1457   }
1458   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1459
1460   if (log_writer_is_active()) {
1461     log_writer_send($datetime . " " . $message);
1462   }
1463 }
1464
1465
1466 sub croak
1467 {
1468   my ($package, $file, $line) = caller;
1469   my $message = "@_ at $file line $line\n";
1470   Log (undef, $message);
1471   freeze() if @jobstep_todo;
1472   collate_output() if @jobstep_todo;
1473   cleanup();
1474   save_meta();
1475   die;
1476 }
1477
1478
1479 sub cleanup
1480 {
1481   return unless $Job;
1482   if ($Job->{'state'} eq 'Cancelled') {
1483     $Job->update_attributes('finished_at' => scalar gmtime);
1484   } else {
1485     $Job->update_attributes('state' => 'Failed');
1486   }
1487 }
1488
1489
1490 sub save_meta
1491 {
1492   my $justcheckpoint = shift; # false if this will be the last meta saved
1493   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1494   return unless log_writer_is_active();
1495
1496   my $loglocator = log_writer_finish();
1497   Log (undef, "log manifest is $loglocator");
1498   $Job->{'log'} = $loglocator;
1499   $Job->update_attributes('log', $loglocator);
1500 }
1501
1502
1503 sub freeze_if_want_freeze
1504 {
1505   if ($main::please_freeze)
1506   {
1507     release_allocation();
1508     if (@_)
1509     {
1510       # kill some srun procs before freeze+stop
1511       map { $proc{$_} = {} } @_;
1512       while (%proc)
1513       {
1514         killem (keys %proc);
1515         select (undef, undef, undef, 0.1);
1516         my $died;
1517         while (($died = waitpid (-1, WNOHANG)) > 0)
1518         {
1519           delete $proc{$died};
1520         }
1521       }
1522     }
1523     freeze();
1524     collate_output();
1525     cleanup();
1526     save_meta();
1527     exit 1;
1528   }
1529 }
1530
1531
1532 sub freeze
1533 {
1534   Log (undef, "Freeze not implemented");
1535   return;
1536 }
1537
1538
1539 sub thaw
1540 {
1541   croak ("Thaw not implemented");
1542 }
1543
1544
1545 sub freezequote
1546 {
1547   my $s = shift;
1548   $s =~ s/\\/\\\\/g;
1549   $s =~ s/\n/\\n/g;
1550   return $s;
1551 }
1552
1553
1554 sub freezeunquote
1555 {
1556   my $s = shift;
1557   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1558   return $s;
1559 }
1560
1561
1562 sub srun
1563 {
1564   my $srunargs = shift;
1565   my $execargs = shift;
1566   my $opts = shift || {};
1567   my $stdin = shift;
1568   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1569
1570   $Data::Dumper::Terse = 1;
1571   $Data::Dumper::Indent = 0;
1572   my $show_cmd = Dumper($args);
1573   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1574   $show_cmd =~ s/\n/ /g;
1575   warn "starting: $show_cmd\n";
1576
1577   if (defined $stdin) {
1578     my $child = open STDIN, "-|";
1579     defined $child or die "no fork: $!";
1580     if ($child == 0) {
1581       print $stdin or die $!;
1582       close STDOUT or die $!;
1583       exit 0;
1584     }
1585   }
1586
1587   return system (@$args) if $opts->{fork};
1588
1589   exec @$args;
1590   warn "ENV size is ".length(join(" ",%ENV));
1591   die "exec failed: $!: @$args";
1592 }
1593
1594
1595 sub ban_node_by_slot {
1596   # Don't start any new jobsteps on this node for 60 seconds
1597   my $slotid = shift;
1598   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1599   $slot[$slotid]->{node}->{hold_count}++;
1600   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1601 }
1602
1603 sub must_lock_now
1604 {
1605   my ($lockfile, $error_message) = @_;
1606   open L, ">", $lockfile or croak("$lockfile: $!");
1607   if (!flock L, LOCK_EX|LOCK_NB) {
1608     croak("Can't lock $lockfile: $error_message\n");
1609   }
1610 }
1611
1612 sub find_docker_image {
1613   # Given a Keep locator, check to see if it contains a Docker image.
1614   # If so, return its stream name and Docker hash.
1615   # If not, return undef for both values.
1616   my $locator = shift;
1617   my ($streamname, $filename);
1618   my $image = api_call("collections/get", uuid => $locator);
1619   if ($image) {
1620     foreach my $line (split(/\n/, $image->{manifest_text})) {
1621       my @tokens = split(/\s+/, $line);
1622       next if (!@tokens);
1623       $streamname = shift(@tokens);
1624       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1625         if (defined($filename)) {
1626           return (undef, undef);  # More than one file in the Collection.
1627         } else {
1628           $filename = (split(/:/, $filedata, 3))[2];
1629         }
1630       }
1631     }
1632   }
1633   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1634     return ($streamname, $1);
1635   } else {
1636     return (undef, undef);
1637   }
1638 }
1639
1640 sub retry_count {
1641   # Calculate the number of times an operation should be retried,
1642   # assuming exponential backoff, and that we're willing to retry as
1643   # long as tasks have been running.  Enforce a minimum of 3 retries.
1644   my ($starttime, $endtime, $timediff, $retries);
1645   if (@jobstep) {
1646     $starttime = $jobstep[0]->{starttime};
1647     $endtime = $jobstep[-1]->{finishtime};
1648   }
1649   if (!defined($starttime)) {
1650     $timediff = 0;
1651   } elsif (!defined($endtime)) {
1652     $timediff = time - $starttime;
1653   } else {
1654     $timediff = ($endtime - $starttime) - (time - $endtime);
1655   }
1656   if ($timediff > 0) {
1657     $retries = int(log($timediff) / log(2));
1658   } else {
1659     $retries = 1;  # Use the minimum.
1660   }
1661   return ($retries > 3) ? $retries : 3;
1662 }
1663
1664 sub retry_op {
1665   # Pass in two function references.
1666   # This method will be called with the remaining arguments.
1667   # If it dies, retry it with exponential backoff until it succeeds,
1668   # or until the current retry_count is exhausted.  After each failure
1669   # that can be retried, the second function will be called with
1670   # the current try count (0-based), next try time, and error message.
1671   my $operation = shift;
1672   my $retry_callback = shift;
1673   my $retries = retry_count();
1674   foreach my $try_count (0..$retries) {
1675     my $next_try = time + (2 ** $try_count);
1676     my $result = eval { $operation->(@_); };
1677     if (!$@) {
1678       return $result;
1679     } elsif ($try_count < $retries) {
1680       $retry_callback->($try_count, $next_try, $@);
1681       my $sleep_time = $next_try - time;
1682       sleep($sleep_time) if ($sleep_time > 0);
1683     }
1684   }
1685   # Ensure the error message ends in a newline, so Perl doesn't add
1686   # retry_op's line number to it.
1687   chomp($@);
1688   die($@ . "\n");
1689 }
1690
1691 sub api_call {
1692   # Pass in a /-separated API method name, and arguments for it.
1693   # This function will call that method, retrying as needed until
1694   # the current retry_count is exhausted, with a log on the first failure.
1695   my $method_name = shift;
1696   my $log_api_retry = sub {
1697     my ($try_count, $next_try_at, $errmsg) = @_;
1698     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1699     $errmsg =~ s/\s/ /g;
1700     $errmsg =~ s/\s+$//;
1701     my $retry_msg;
1702     if ($next_try_at < time) {
1703       $retry_msg = "Retrying.";
1704     } else {
1705       my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
1706       $retry_msg = "Retrying at $next_try_fmt.";
1707     }
1708     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1709   };
1710   my $method = $arv;
1711   foreach my $key (split(/\//, $method_name)) {
1712     $method = $method->{$key};
1713   }
1714   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1715 }
1716
1717 sub exit_status_s {
1718   # Given a $?, return a human-readable exit code string like "0" or
1719   # "1" or "0 with signal 1" or "1 with signal 11".
1720   my $exitcode = shift;
1721   my $s = $exitcode >> 8;
1722   if ($exitcode & 0x7f) {
1723     $s .= " with signal " . ($exitcode & 0x7f);
1724   }
1725   if ($exitcode & 0x80) {
1726     $s .= " with core dump";
1727   }
1728   return $s;
1729 }
1730
1731 __DATA__
1732 #!/usr/bin/perl
1733
1734 # checkout-and-build
1735
1736 use Fcntl ':flock';
1737 use File::Path qw( make_path remove_tree );
1738
1739 my $destdir = $ENV{"CRUNCH_SRC"};
1740 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1741 my $repo = $ENV{"CRUNCH_SRC_URL"};
1742 my $job_work = $ENV{"JOB_WORK"};
1743 my $task_work = $ENV{"TASK_WORK"};
1744
1745 for my $dir ($destdir, $job_work, $task_work) {
1746   if ($dir) {
1747     make_path $dir;
1748     -e $dir or die "Failed to create temporary directory ($dir): $!";
1749   }
1750 }
1751
1752 if ($task_work) {
1753   remove_tree($task_work, {keep_root => 1});
1754 }
1755
1756 my @git_archive_data = <DATA>;
1757 if (!@git_archive_data) {
1758   # Nothing to extract -> nothing to install.
1759   run_argv_and_exit();
1760 }
1761
1762 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1763 flock L, LOCK_EX;
1764 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1765   # This version already installed -> nothing to do.
1766   run_argv_and_exit();
1767 }
1768
1769 unlink "$destdir.commit";
1770 open STDERR_ORIG, ">&STDERR";
1771 open STDOUT, ">", "$destdir.log";
1772 open STDERR, ">&STDOUT";
1773
1774 mkdir $destdir;
1775 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1776 print TARX @git_archive_data;
1777 if(!close(TARX)) {
1778   die "'tar -C $destdir -xf -' exited $?: $!";
1779 }
1780
1781 my $pwd;
1782 chomp ($pwd = `pwd`);
1783 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1784 mkdir $install_dir;
1785
1786 for my $src_path ("$destdir/arvados/sdk/python") {
1787   if (-d $src_path) {
1788     shell_or_die ("virtualenv", $install_dir);
1789     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1790   }
1791 }
1792
1793 if (-e "$destdir/crunch_scripts/install") {
1794     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1795 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1796     # Old version
1797     shell_or_die ("./tests/autotests.sh", $install_dir);
1798 } elsif (-e "./install.sh") {
1799     shell_or_die ("./install.sh", $install_dir);
1800 }
1801
1802 if ($commit) {
1803     unlink "$destdir.commit.new";
1804     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1805     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1806 }
1807
1808 close L;
1809
1810 run_argv_and_exit();
1811
1812 sub run_argv_and_exit
1813 {
1814   if (@ARGV) {
1815     exec(@ARGV);
1816     die "Cannot exec `@ARGV`: $!";
1817   } else {
1818     exit 0;
1819   }
1820 }
1821
1822 sub shell_or_die
1823 {
1824   if ($ENV{"DEBUG"}) {
1825     print STDERR "@_\n";
1826   }
1827   if (system (@_) != 0) {
1828     my $err = $!;
1829     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1830     open STDERR, ">&STDERR_ORIG";
1831     system ("cat $destdir.log >&2");
1832     die "@_ failed ($err): $exitstatus";
1833   }
1834 }
1835
1836 __DATA__