4012: crunch-job retries all API operations.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
79 use Arvados;
80 use Digest::MD5 qw(md5_hex);
81 use Getopt::Long;
82 use IPC::Open2;
83 use IO::Select;
84 use File::Temp;
85 use Fcntl ':flock';
86 use File::Path qw( make_path remove_tree );
87
88 use constant EX_TEMPFAIL => 75;
89
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93   if ($ENV{"USER"} ne "crunch" && $< != 0) {
94     # use a tmp dir unique for my uid
95     $ENV{"CRUNCH_TMP"} .= "-$<";
96   }
97 }
98
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
102 }
103
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
108
109 my $force_unlock;
110 my $git_dir;
111 my $jobspec;
112 my $job_api_token;
113 my $no_clear_tmp;
114 my $resume_stash;
115 GetOptions('force-unlock' => \$force_unlock,
116            'git-dir=s' => \$git_dir,
117            'job=s' => \$jobspec,
118            'job-api-token=s' => \$job_api_token,
119            'no-clear-tmp' => \$no_clear_tmp,
120            'resume-stash=s' => \$resume_stash,
121     );
122
123 if (defined $job_api_token) {
124   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 }
126
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $local_job = 0;
129
130
131 $SIG{'USR1'} = sub
132 {
133   $main::ENV{CRUNCH_DEBUG} = 1;
134 };
135 $SIG{'USR2'} = sub
136 {
137   $main::ENV{CRUNCH_DEBUG} = 0;
138 };
139
140
141
142 my $arv = Arvados->new('apiVersion' => 'v1');
143
144 my $Job;
145 my $job_id;
146 my $dbh;
147 my $sth;
148 my @jobstep;
149
150 my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
151
152 if ($jobspec =~ /^[-a-z\d]+$/)
153 {
154   # $jobspec is an Arvados UUID, not a JSON job specification
155   $Job = retry_op(sub {
156     $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
157   });
158   if (!$force_unlock) {
159     # Claim this job, and make sure nobody else does
160     eval { retry_op(sub {
161       # lock() sets is_locked_by_uuid and changes state to Running.
162       $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
163     }); };
164     if ($@) {
165       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
166       exit EX_TEMPFAIL;
167     };
168   }
169 }
170 else
171 {
172   $Job = JSON::decode_json($jobspec);
173
174   if (!$resume_stash)
175   {
176     map { croak ("No $_ specified") unless $Job->{$_} }
177     qw(script script_version script_parameters);
178   }
179
180   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
181   $Job->{'started_at'} = gmtime;
182   $Job->{'state'} = 'Running';
183
184   $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
185 }
186 $job_id = $Job->{'uuid'};
187
188 my $keep_logfile = $job_id . '.log.txt';
189 log_writer_start($keep_logfile);
190
191 $Job->{'runtime_constraints'} ||= {};
192 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
193 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
194
195
196 Log (undef, "check slurm allocation");
197 my @slot;
198 my @node;
199 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my @sinfo;
201 if (!$have_slurm)
202 {
203   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
204   push @sinfo, "$localcpus localhost";
205 }
206 if (exists $ENV{SLURM_NODELIST})
207 {
208   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 }
210 foreach (@sinfo)
211 {
212   my ($ncpus, $slurm_nodelist) = split;
213   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
214
215   my @nodelist;
216   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
217   {
218     my $nodelist = $1;
219     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
220     {
221       my $ranges = $1;
222       foreach (split (",", $ranges))
223       {
224         my ($a, $b);
225         if (/(\d+)-(\d+)/)
226         {
227           $a = $1;
228           $b = $2;
229         }
230         else
231         {
232           $a = $_;
233           $b = $_;
234         }
235         push @nodelist, map {
236           my $n = $nodelist;
237           $n =~ s/\[[-,\d]+\]/$_/;
238           $n;
239         } ($a..$b);
240       }
241     }
242     else
243     {
244       push @nodelist, $nodelist;
245     }
246   }
247   foreach my $nodename (@nodelist)
248   {
249     Log (undef, "node $nodename - $ncpus slots");
250     my $node = { name => $nodename,
251                  ncpus => $ncpus,
252                  losing_streak => 0,
253                  hold_until => 0 };
254     foreach my $cpu (1..$ncpus)
255     {
256       push @slot, { node => $node,
257                     cpu => $cpu };
258     }
259   }
260   push @node, @nodelist;
261 }
262
263
264
265 # Ensure that we get one jobstep running on each allocated node before
266 # we start overloading nodes with concurrent steps
267
268 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
269
270
271 $Job->update_attributes(
272   'tasks_summary' => { 'failed' => 0,
273                        'todo' => 1,
274                        'running' => 0,
275                        'done' => 0 });
276
277 Log (undef, "start");
278 $SIG{'INT'} = sub { $main::please_freeze = 1; };
279 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
280 $SIG{'TERM'} = \&croak;
281 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
282 $SIG{'ALRM'} = sub { $main::please_info = 1; };
283 $SIG{'CONT'} = sub { $main::please_continue = 1; };
284 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
285
286 $main::please_freeze = 0;
287 $main::please_info = 0;
288 $main::please_continue = 0;
289 $main::please_refresh = 0;
290 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
291
292 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
293 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
294 $ENV{"JOB_UUID"} = $job_id;
295
296
297 my @jobstep_todo = ();
298 my @jobstep_done = ();
299 my @jobstep_tomerge = ();
300 my $jobstep_tomerge_level = 0;
301 my $squeue_checked;
302 my $squeue_kill_checked;
303 my $output_in_keep = 0;
304 my $latest_refresh = scalar time;
305
306
307
308 if (defined $Job->{thawedfromkey})
309 {
310   thaw ($Job->{thawedfromkey});
311 }
312 else
313 {
314   my $first_task = retry_op(sub {
315     $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
316       'job_uuid' => $Job->{'uuid'},
317       'sequence' => 0,
318       'qsequence' => 0,
319       'parameters' => {},
320     });
321   });
322   push @jobstep, { 'level' => 0,
323                    'failures' => 0,
324                    'arvados_task' => $first_task,
325                  };
326   push @jobstep_todo, 0;
327 }
328
329
330 if (!$have_slurm)
331 {
332   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
333 }
334
335
336 my $build_script;
337 do {
338   local $/ = undef;
339   $build_script = <DATA>;
340 };
341 my $nodelist = join(",", @node);
342
343 if (!defined $no_clear_tmp) {
344   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
345   Log (undef, "Clean work dirs");
346
347   my $cleanpid = fork();
348   if ($cleanpid == 0)
349   {
350     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
351           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
352     exit (1);
353   }
354   while (1)
355   {
356     last if $cleanpid == waitpid (-1, WNOHANG);
357     freeze_if_want_freeze ($cleanpid);
358     select (undef, undef, undef, 0.1);
359   }
360   Log (undef, "Cleanup command exited ".exit_status_s($?));
361 }
362
363
364 my $git_archive;
365 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
366   # If we're in user-land (i.e., not called from crunch-dispatch)
367   # script_version can be an absolute directory path, signifying we
368   # should work straight out of that directory instead of using a git
369   # commit.
370   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
371   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
372 }
373 else {
374   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
375
376   # Install requested code version
377   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
378
379   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
380
381   # If we're running under crunch-dispatch, it will have already
382   # pulled the appropriate source tree into its own repository, and
383   # given us that repo's path as $git_dir.
384   #
385   # If we're running a "local" job, we might have to fetch content
386   # from a remote repository.
387   #
388   # (Currently crunch-dispatch gives a local path with --git-dir, but
389   # we might as well accept URLs there too in case it changes its
390   # mind.)
391   my $repo = $git_dir || $Job->{'repository'};
392
393   # Repository can be remote or local. If remote, we'll need to fetch it
394   # to a local dir before doing `git log` et al.
395   my $repo_location;
396
397   if ($repo =~ m{://|^[^/]*:}) {
398     # $repo is a git url we can clone, like git:// or https:// or
399     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
400     # not recognized here because distinguishing that from a local
401     # path is too fragile. If you really need something strange here,
402     # use the ssh:// form.
403     $repo_location = 'remote';
404   } elsif ($repo =~ m{^\.*/}) {
405     # $repo is a local path to a git index. We'll also resolve ../foo
406     # to ../foo/.git if the latter is a directory. To help
407     # disambiguate local paths from named hosted repositories, this
408     # form must be given as ./ or ../ if it's a relative path.
409     if (-d "$repo/.git") {
410       $repo = "$repo/.git";
411     }
412     $repo_location = 'local';
413   } else {
414     # $repo is none of the above. It must be the name of a hosted
415     # repository.
416     my $arv_repo_list = retry_op(sub {
417       $arv->{'repositories'}->{'list'}->execute(
418         'filters' => [['name','=',$repo]])->{'items'};
419     });
420     my $n_found = scalar @{$arv_repo_list};
421     if ($n_found > 0) {
422       Log(undef, "Repository '$repo' -> "
423           . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
424     }
425     if ($n_found != 1) {
426       croak("Error: Found $n_found repositories with name '$repo'.");
427     }
428     $repo = $arv_repo_list->[0]->{'fetch_url'};
429     $repo_location = 'remote';
430   }
431   Log(undef, "Using $repo_location repository '$repo'");
432   $ENV{"CRUNCH_SRC_URL"} = $repo;
433
434   # Resolve given script_version (we'll call that $treeish here) to a
435   # commit sha1 ($commit).
436   my $treeish = $Job->{'script_version'};
437   my $commit;
438   if ($repo_location eq 'remote') {
439     # We minimize excess object-fetching by re-using the same bare
440     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
441     # just keep adding remotes to it as needed.
442     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
443     my $gitcmd = "git --git-dir=\Q$local_repo\E";
444
445     # Set up our local repo for caching remote objects, making
446     # archives, etc.
447     if (!-d $local_repo) {
448       make_path($local_repo) or croak("Error: could not create $local_repo");
449     }
450     # This works (exits 0 and doesn't delete fetched objects) even
451     # if $local_repo is already initialized:
452     `$gitcmd init --bare`;
453     if ($?) {
454       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
455     }
456
457     # If $treeish looks like a hash (or abbrev hash) we look it up in
458     # our local cache first, since that's cheaper. (We don't want to
459     # do that with tags/branches though -- those change over time, so
460     # they should always be resolved by the remote repo.)
461     if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
462       # Hide stderr because it's normal for this to fail:
463       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
464       if ($? == 0 &&
465           $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
466           $sha1 =~ /^([0-9a-f]{40})$/s) {
467         $commit = $1;
468         Log(undef, "Commit $commit already present in $local_repo");
469       }
470     }
471
472     if (!defined $commit) {
473       # If $treeish isn't just a hash or abbrev hash, or isn't here
474       # yet, we need to fetch the remote to resolve it correctly.
475
476       # First, remove all local heads. This prevents a name that does
477       # not exist on the remote from resolving to (or colliding with)
478       # a previously fetched branch or tag (possibly from a different
479       # remote).
480       remove_tree("$local_repo/refs/heads", {keep_root => 1});
481
482       Log(undef, "Fetching objects from $repo to $local_repo");
483       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
484       if ($?) {
485         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
486       }
487     }
488
489     # Now that the data is all here, we will use our local repo for
490     # the rest of our git activities.
491     $repo = $local_repo;
492   }
493
494   my $gitcmd = "git --git-dir=\Q$repo\E";
495   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
496   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
497     croak("`$gitcmd rev-list` exited "
498           .exit_status_s($?)
499           .", '$treeish' not found. Giving up.");
500   }
501   $commit = $1;
502   Log(undef, "Version $treeish is commit $commit");
503
504   if ($commit ne $Job->{'script_version'}) {
505     # Record the real commit id in the database, frozentokey, logs,
506     # etc. -- instead of an abbreviation or a branch name which can
507     # become ambiguous or point to a different commit in the future.
508     if (!$Job->update_attributes('script_version' => $commit)) {
509       croak("Error: failed to update job's script_version attribute");
510     }
511   }
512
513   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
514   $git_archive = `$gitcmd archive ''\Q$commit\E`;
515   if ($?) {
516     croak("Error: $gitcmd archive exited ".exit_status_s($?));
517   }
518 }
519
520 if (!defined $git_archive) {
521   Log(undef, "Skip install phase (no git archive)");
522   if ($have_slurm) {
523     Log(undef, "Warning: This probably means workers have no source tree!");
524   }
525 }
526 else {
527   Log(undef, "Run install script on all workers");
528
529   my @srunargs = ("srun",
530                   "--nodelist=$nodelist",
531                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
532   my @execargs = ("sh", "-c",
533                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
534
535   # Note: this section is almost certainly unnecessary if we're
536   # running tasks in docker containers.
537   my $installpid = fork();
538   if ($installpid == 0)
539   {
540     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
541     exit (1);
542   }
543   while (1)
544   {
545     last if $installpid == waitpid (-1, WNOHANG);
546     freeze_if_want_freeze ($installpid);
547     select (undef, undef, undef, 0.1);
548   }
549   Log (undef, "Install script exited ".exit_status_s($?));
550 }
551
552 if (!$have_slurm)
553 {
554   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
555   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
556 }
557
558 # If this job requires a Docker image, install that.
559 my $docker_bin = "/usr/bin/docker.io";
560 my ($docker_locator, $docker_stream, $docker_hash);
561 if ($docker_locator = $Job->{docker_image_locator}) {
562   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
563   if (!$docker_hash)
564   {
565     croak("No Docker image hash found from locator $docker_locator");
566   }
567   $docker_stream =~ s/^\.//;
568   my $docker_install_script = qq{
569 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
570     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
571 fi
572 };
573   my $docker_pid = fork();
574   if ($docker_pid == 0)
575   {
576     srun (["srun", "--nodelist=" . join(',', @node)],
577           ["/bin/sh", "-ec", $docker_install_script]);
578     exit ($?);
579   }
580   while (1)
581   {
582     last if $docker_pid == waitpid (-1, WNOHANG);
583     freeze_if_want_freeze ($docker_pid);
584     select (undef, undef, undef, 0.1);
585   }
586   if ($? != 0)
587   {
588     croak("Installing Docker image from $docker_locator exited "
589           .exit_status_s($?));
590   }
591 }
592
593 foreach (qw (script script_version script_parameters runtime_constraints))
594 {
595   Log (undef,
596        "$_ " .
597        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
598 }
599 foreach (split (/\n/, $Job->{knobs}))
600 {
601   Log (undef, "knob " . $_);
602 }
603
604
605
606 $main::success = undef;
607
608
609
610 ONELEVEL:
611
612 my $thisround_succeeded = 0;
613 my $thisround_failed = 0;
614 my $thisround_failed_multiple = 0;
615
616 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
617                        or $a <=> $b } @jobstep_todo;
618 my $level = $jobstep[$jobstep_todo[0]]->{level};
619 Log (undef, "start level $level");
620
621
622
623 my %proc;
624 my @freeslot = (0..$#slot);
625 my @holdslot;
626 my %reader;
627 my $progress_is_dirty = 1;
628 my $progress_stats_updated = 0;
629
630 update_progress_stats();
631
632
633
634 THISROUND:
635 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
636 {
637   my $id = $jobstep_todo[$todo_ptr];
638   my $Jobstep = $jobstep[$id];
639   if ($Jobstep->{level} != $level)
640   {
641     next;
642   }
643
644   pipe $reader{$id}, "writer" or croak ($!);
645   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
646   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
647
648   my $childslot = $freeslot[0];
649   my $childnode = $slot[$childslot]->{node};
650   my $childslotname = join (".",
651                             $slot[$childslot]->{node}->{name},
652                             $slot[$childslot]->{cpu});
653   my $childpid = fork();
654   if ($childpid == 0)
655   {
656     $SIG{'INT'} = 'DEFAULT';
657     $SIG{'QUIT'} = 'DEFAULT';
658     $SIG{'TERM'} = 'DEFAULT';
659
660     foreach (values (%reader))
661     {
662       close($_);
663     }
664     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
665     open(STDOUT,">&writer");
666     open(STDERR,">&writer");
667
668     undef $dbh;
669     undef $sth;
670
671     delete $ENV{"GNUPGHOME"};
672     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
673     $ENV{"TASK_QSEQUENCE"} = $id;
674     $ENV{"TASK_SEQUENCE"} = $level;
675     $ENV{"JOB_SCRIPT"} = $Job->{script};
676     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
677       $param =~ tr/a-z/A-Z/;
678       $ENV{"JOB_PARAMETER_$param"} = $value;
679     }
680     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
681     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
682     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
683     $ENV{"HOME"} = $ENV{"TASK_WORK"};
684     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
685     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
686     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
687     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
688
689     $ENV{"GZIP"} = "-n";
690
691     my @srunargs = (
692       "srun",
693       "--nodelist=".$childnode->{name},
694       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
695       "--job-name=$job_id.$id.$$",
696         );
697     my $build_script_to_send = "";
698     my $command =
699         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
700         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
701         ."&& cd $ENV{CRUNCH_TMP} ";
702     if ($build_script)
703     {
704       $build_script_to_send = $build_script;
705       $command .=
706           "&& perl -";
707     }
708     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
709     if ($docker_hash)
710     {
711       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
712       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
713       # Dynamically configure the container to use the host system as its
714       # DNS server.  Get the host's global addresses from the ip command,
715       # and turn them into docker --dns options using gawk.
716       $command .=
717           q{$(ip -o address show scope global |
718               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
719       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
720       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
721       $command .= "--env=\QHOME=/home/crunch\E ";
722       while (my ($env_key, $env_val) = each %ENV)
723       {
724         if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
725           if ($env_key eq "TASK_WORK") {
726             $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
727           }
728           elsif ($env_key eq "TASK_KEEPMOUNT") {
729             $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
730           }
731           else {
732             $command .= "--env=\Q$env_key=$env_val\E ";
733           }
734         }
735       }
736       $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
737       $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
738       $command .= "\Q$docker_hash\E ";
739       $command .= "stdbuf --output=0 --error=0 ";
740       $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
741     } else {
742       # Non-docker run
743       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
744       $command .= "stdbuf --output=0 --error=0 ";
745       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
746     }
747
748     my @execargs = ('bash', '-c', $command);
749     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
750     # exec() failed, we assume nothing happened.
751     Log(undef, "srun() failed on build script");
752     die;
753   }
754   close("writer");
755   if (!defined $childpid)
756   {
757     close $reader{$id};
758     delete $reader{$id};
759     next;
760   }
761   shift @freeslot;
762   $proc{$childpid} = { jobstep => $id,
763                        time => time,
764                        slot => $childslot,
765                        jobstepname => "$job_id.$id.$childpid",
766                      };
767   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
768   $slot[$childslot]->{pid} = $childpid;
769
770   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
771   Log ($id, "child $childpid started on $childslotname");
772   $Jobstep->{starttime} = time;
773   $Jobstep->{node} = $childnode->{name};
774   $Jobstep->{slotindex} = $childslot;
775   delete $Jobstep->{stderr};
776   delete $Jobstep->{finishtime};
777
778   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
779   $Jobstep->{'arvados_task'}->save;
780
781   splice @jobstep_todo, $todo_ptr, 1;
782   --$todo_ptr;
783
784   $progress_is_dirty = 1;
785
786   while (!@freeslot
787          ||
788          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
789   {
790     last THISROUND if $main::please_freeze;
791     if ($main::please_info)
792     {
793       $main::please_info = 0;
794       freeze();
795       collate_output();
796       save_meta(1);
797       update_progress_stats();
798     }
799     my $gotsome
800         = readfrompipes ()
801         + reapchildren ();
802     if (!$gotsome)
803     {
804       check_refresh_wanted();
805       check_squeue();
806       update_progress_stats();
807       select (undef, undef, undef, 0.1);
808     }
809     elsif (time - $progress_stats_updated >= 30)
810     {
811       update_progress_stats();
812     }
813     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
814         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
815     {
816       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
817           .($thisround_failed+$thisround_succeeded)
818           .") -- giving up on this round";
819       Log (undef, $message);
820       last THISROUND;
821     }
822
823     # move slots from freeslot to holdslot (or back to freeslot) if necessary
824     for (my $i=$#freeslot; $i>=0; $i--) {
825       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
826         push @holdslot, (splice @freeslot, $i, 1);
827       }
828     }
829     for (my $i=$#holdslot; $i>=0; $i--) {
830       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
831         push @freeslot, (splice @holdslot, $i, 1);
832       }
833     }
834
835     # give up if no nodes are succeeding
836     if (!grep { $_->{node}->{losing_streak} == 0 &&
837                     $_->{node}->{hold_count} < 4 } @slot) {
838       my $message = "Every node has failed -- giving up on this round";
839       Log (undef, $message);
840       last THISROUND;
841     }
842   }
843 }
844
845
846 push @freeslot, splice @holdslot;
847 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
848
849
850 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
851 while (%proc)
852 {
853   if ($main::please_continue) {
854     $main::please_continue = 0;
855     goto THISROUND;
856   }
857   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
858   readfrompipes ();
859   if (!reapchildren())
860   {
861     check_refresh_wanted();
862     check_squeue();
863     update_progress_stats();
864     select (undef, undef, undef, 0.1);
865     killem (keys %proc) if $main::please_freeze;
866   }
867 }
868
869 update_progress_stats();
870 freeze_if_want_freeze();
871
872
873 if (!defined $main::success)
874 {
875   if (@jobstep_todo &&
876       $thisround_succeeded == 0 &&
877       ($thisround_failed == 0 || $thisround_failed > 4))
878   {
879     my $message = "stop because $thisround_failed tasks failed and none succeeded";
880     Log (undef, $message);
881     $main::success = 0;
882   }
883   if (!@jobstep_todo)
884   {
885     $main::success = 1;
886   }
887 }
888
889 goto ONELEVEL if !defined $main::success;
890
891
892 release_allocation();
893 freeze();
894 my $collated_output = &collate_output();
895
896 if (!$collated_output) {
897   Log(undef, "output undef");
898 }
899 else {
900   eval {
901     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
902         or die "failed to get collated manifest: $!";
903     my $orig_manifest_text = '';
904     while (my $manifest_line = <$orig_manifest>) {
905       $orig_manifest_text .= $manifest_line;
906     }
907     my $output = retry_op(sub {
908       $arv->{'collections'}->{'create'}->execute(
909         'collection' => {'manifest_text' => $orig_manifest_text});
910     });
911     Log(undef, "output uuid " . $output->{uuid});
912     Log(undef, "output hash " . $output->{portable_data_hash});
913     $Job->update_attributes('output' => $output->{portable_data_hash});
914   };
915   if ($@) {
916     Log (undef, "Failed to register output manifest: $@");
917   }
918 }
919
920 Log (undef, "finish");
921
922 save_meta();
923
924 my $final_state;
925 if ($collated_output && $main::success) {
926   $final_state = 'Complete';
927 } else {
928   $final_state = 'Failed';
929 }
930 $Job->update_attributes('state' => $final_state);
931
932 exit (($final_state eq 'Complete') ? 0 : 1);
933
934
935
936 sub update_progress_stats
937 {
938   $progress_stats_updated = time;
939   return if !$progress_is_dirty;
940   my ($todo, $done, $running) = (scalar @jobstep_todo,
941                                  scalar @jobstep_done,
942                                  scalar @slot - scalar @freeslot - scalar @holdslot);
943   $Job->{'tasks_summary'} ||= {};
944   $Job->{'tasks_summary'}->{'todo'} = $todo;
945   $Job->{'tasks_summary'}->{'done'} = $done;
946   $Job->{'tasks_summary'}->{'running'} = $running;
947   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
948   Log (undef, "status: $done done, $running running, $todo todo");
949   $progress_is_dirty = 0;
950 }
951
952
953
954 sub reapchildren
955 {
956   my $pid = waitpid (-1, WNOHANG);
957   return 0 if $pid <= 0;
958
959   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
960                   . "."
961                   . $slot[$proc{$pid}->{slot}]->{cpu});
962   my $jobstepid = $proc{$pid}->{jobstep};
963   my $elapsed = time - $proc{$pid}->{time};
964   my $Jobstep = $jobstep[$jobstepid];
965
966   my $childstatus = $?;
967   my $exitvalue = $childstatus >> 8;
968   my $exitinfo = "exit ".exit_status_s($childstatus);
969   $Jobstep->{'arvados_task'}->reload;
970   my $task_success = $Jobstep->{'arvados_task'}->{success};
971
972   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
973
974   if (!defined $task_success) {
975     # task did not indicate one way or the other --> fail
976     $Jobstep->{'arvados_task'}->{success} = 0;
977     $Jobstep->{'arvados_task'}->save;
978     $task_success = 0;
979   }
980
981   if (!$task_success)
982   {
983     my $temporary_fail;
984     $temporary_fail ||= $Jobstep->{node_fail};
985     $temporary_fail ||= ($exitvalue == 111);
986
987     ++$thisround_failed;
988     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
989
990     # Check for signs of a failed or misconfigured node
991     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
992         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
993       # Don't count this against jobstep failure thresholds if this
994       # node is already suspected faulty and srun exited quickly
995       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
996           $elapsed < 5) {
997         Log ($jobstepid, "blaming failure on suspect node " .
998              $slot[$proc{$pid}->{slot}]->{node}->{name});
999         $temporary_fail ||= 1;
1000       }
1001       ban_node_by_slot($proc{$pid}->{slot});
1002     }
1003
1004     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1005                              ++$Jobstep->{'failures'},
1006                              $temporary_fail ? 'temporary ' : 'permanent',
1007                              $elapsed));
1008
1009     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1010       # Give up on this task, and the whole job
1011       $main::success = 0;
1012       $main::please_freeze = 1;
1013     }
1014     # Put this task back on the todo queue
1015     push @jobstep_todo, $jobstepid;
1016     $Job->{'tasks_summary'}->{'failed'}++;
1017   }
1018   else
1019   {
1020     ++$thisround_succeeded;
1021     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1022     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1023     push @jobstep_done, $jobstepid;
1024     Log ($jobstepid, "success in $elapsed seconds");
1025   }
1026   $Jobstep->{exitcode} = $childstatus;
1027   $Jobstep->{finishtime} = time;
1028   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1029   $Jobstep->{'arvados_task'}->save;
1030   process_stderr ($jobstepid, $task_success);
1031   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1032
1033   close $reader{$jobstepid};
1034   delete $reader{$jobstepid};
1035   delete $slot[$proc{$pid}->{slot}]->{pid};
1036   push @freeslot, $proc{$pid}->{slot};
1037   delete $proc{$pid};
1038
1039   if ($task_success) {
1040     # Load new tasks
1041     my $newtask_list = [];
1042     my $newtask_results;
1043     do {
1044       $newtask_results = retry_op(sub {
1045         $arv->{'job_tasks'}->{'list'}->execute(
1046           'where' => {
1047             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1048           },
1049           'order' => 'qsequence',
1050           'offset' => scalar(@$newtask_list),
1051         );
1052       });
1053       push(@$newtask_list, @{$newtask_results->{items}});
1054     } while (@{$newtask_results->{items}});
1055     foreach my $arvados_task (@$newtask_list) {
1056       my $jobstep = {
1057         'level' => $arvados_task->{'sequence'},
1058         'failures' => 0,
1059         'arvados_task' => $arvados_task
1060       };
1061       push @jobstep, $jobstep;
1062       push @jobstep_todo, $#jobstep;
1063     }
1064   }
1065
1066   $progress_is_dirty = 1;
1067   1;
1068 }
1069
1070 sub check_refresh_wanted
1071 {
1072   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1073   if (@stat && $stat[9] > $latest_refresh) {
1074     $latest_refresh = scalar time;
1075     my $Job2 = retry_op(sub {
1076       $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1077     });
1078     for my $attr ('cancelled_at',
1079                   'cancelled_by_user_uuid',
1080                   'cancelled_by_client_uuid',
1081                   'state') {
1082       $Job->{$attr} = $Job2->{$attr};
1083     }
1084     if ($Job->{'state'} ne "Running") {
1085       if ($Job->{'state'} eq "Cancelled") {
1086         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1087       } else {
1088         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1089       }
1090       $main::success = 0;
1091       $main::please_freeze = 1;
1092     }
1093   }
1094 }
1095
1096 sub check_squeue
1097 {
1098   # return if the kill list was checked <4 seconds ago
1099   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1100   {
1101     return;
1102   }
1103   $squeue_kill_checked = time;
1104
1105   # use killem() on procs whose killtime is reached
1106   for (keys %proc)
1107   {
1108     if (exists $proc{$_}->{killtime}
1109         && $proc{$_}->{killtime} <= time)
1110     {
1111       killem ($_);
1112     }
1113   }
1114
1115   # return if the squeue was checked <60 seconds ago
1116   if (defined $squeue_checked && $squeue_checked > time - 60)
1117   {
1118     return;
1119   }
1120   $squeue_checked = time;
1121
1122   if (!$have_slurm)
1123   {
1124     # here is an opportunity to check for mysterious problems with local procs
1125     return;
1126   }
1127
1128   # get a list of steps still running
1129   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1130   chop @squeue;
1131   if ($squeue[-1] ne "ok")
1132   {
1133     return;
1134   }
1135   pop @squeue;
1136
1137   # which of my jobsteps are running, according to squeue?
1138   my %ok;
1139   foreach (@squeue)
1140   {
1141     if (/^(\d+)\.(\d+) (\S+)/)
1142     {
1143       if ($1 eq $ENV{SLURM_JOBID})
1144       {
1145         $ok{$3} = 1;
1146       }
1147     }
1148   }
1149
1150   # which of my active child procs (>60s old) were not mentioned by squeue?
1151   foreach (keys %proc)
1152   {
1153     if ($proc{$_}->{time} < time - 60
1154         && !exists $ok{$proc{$_}->{jobstepname}}
1155         && !exists $proc{$_}->{killtime})
1156     {
1157       # kill this proc if it hasn't exited in 30 seconds
1158       $proc{$_}->{killtime} = time + 30;
1159     }
1160   }
1161 }
1162
1163
1164 sub release_allocation
1165 {
1166   if ($have_slurm)
1167   {
1168     Log (undef, "release job allocation");
1169     system "scancel $ENV{SLURM_JOBID}";
1170   }
1171 }
1172
1173
1174 sub readfrompipes
1175 {
1176   my $gotsome = 0;
1177   foreach my $job (keys %reader)
1178   {
1179     my $buf;
1180     while (0 < sysread ($reader{$job}, $buf, 8192))
1181     {
1182       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1183       $jobstep[$job]->{stderr} .= $buf;
1184       preprocess_stderr ($job);
1185       if (length ($jobstep[$job]->{stderr}) > 16384)
1186       {
1187         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1188       }
1189       $gotsome = 1;
1190     }
1191   }
1192   return $gotsome;
1193 }
1194
1195
1196 sub preprocess_stderr
1197 {
1198   my $job = shift;
1199
1200   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1201     my $line = $1;
1202     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1203     Log ($job, "stderr $line");
1204     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1205       # whoa.
1206       $main::please_freeze = 1;
1207     }
1208     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1209       $jobstep[$job]->{node_fail} = 1;
1210       ban_node_by_slot($jobstep[$job]->{slotindex});
1211     }
1212   }
1213 }
1214
1215
1216 sub process_stderr
1217 {
1218   my $job = shift;
1219   my $task_success = shift;
1220   preprocess_stderr ($job);
1221
1222   map {
1223     Log ($job, "stderr $_");
1224   } split ("\n", $jobstep[$job]->{stderr});
1225 }
1226
1227 sub fetch_block
1228 {
1229   my $hash = shift;
1230   my ($keep, $child_out, $output_block);
1231
1232   my $cmd = "arv-get \Q$hash\E";
1233   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1234   $output_block = '';
1235   while (1) {
1236     my $buf;
1237     my $bytes = sysread($keep, $buf, 1024 * 1024);
1238     if (!defined $bytes) {
1239       die "reading from arv-get: $!";
1240     } elsif ($bytes == 0) {
1241       # sysread returns 0 at the end of the pipe.
1242       last;
1243     } else {
1244       # some bytes were read into buf.
1245       $output_block .= $buf;
1246     }
1247   }
1248   close $keep;
1249   return $output_block;
1250 }
1251
1252 sub collate_output
1253 {
1254   Log (undef, "collate");
1255
1256   my ($child_out, $child_in);
1257   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1258                   '--retries', retry_count());
1259   my $joboutput;
1260   for (@jobstep)
1261   {
1262     next if (!exists $_->{'arvados_task'}->{'output'} ||
1263              !$_->{'arvados_task'}->{'success'});
1264     my $output = $_->{'arvados_task'}->{output};
1265     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1266     {
1267       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1268       print $child_in $output;
1269     }
1270     elsif (@jobstep == 1)
1271     {
1272       $joboutput = $output;
1273       last;
1274     }
1275     elsif (defined (my $outblock = fetch_block ($output)))
1276     {
1277       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1278       print $child_in $outblock;
1279     }
1280     else
1281     {
1282       Log (undef, "XXX fetch_block($output) failed XXX");
1283       $main::success = 0;
1284     }
1285   }
1286   $child_in->close;
1287
1288   if (!defined $joboutput) {
1289     my $s = IO::Select->new($child_out);
1290     if ($s->can_read(120)) {
1291       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1292       chomp($joboutput);
1293       # TODO: Ensure exit status == 0.
1294     } else {
1295       Log (undef, "timed out reading from 'arv-put'");
1296     }
1297   }
1298   # TODO: kill $pid instead of waiting, now that we've decided to
1299   # ignore further output.
1300   waitpid($pid, 0);
1301
1302   return $joboutput;
1303 }
1304
1305
1306 sub killem
1307 {
1308   foreach (@_)
1309   {
1310     my $sig = 2;                # SIGINT first
1311     if (exists $proc{$_}->{"sent_$sig"} &&
1312         time - $proc{$_}->{"sent_$sig"} > 4)
1313     {
1314       $sig = 15;                # SIGTERM if SIGINT doesn't work
1315     }
1316     if (exists $proc{$_}->{"sent_$sig"} &&
1317         time - $proc{$_}->{"sent_$sig"} > 4)
1318     {
1319       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1320     }
1321     if (!exists $proc{$_}->{"sent_$sig"})
1322     {
1323       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1324       kill $sig, $_;
1325       select (undef, undef, undef, 0.1);
1326       if ($sig == 2)
1327       {
1328         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1329       }
1330       $proc{$_}->{"sent_$sig"} = time;
1331       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1332     }
1333   }
1334 }
1335
1336
1337 sub fhbits
1338 {
1339   my($bits);
1340   for (@_) {
1341     vec($bits,fileno($_),1) = 1;
1342   }
1343   $bits;
1344 }
1345
1346
1347 # Send log output to Keep via arv-put.
1348 #
1349 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1350 # $log_pipe_pid is the pid of the arv-put subprocess.
1351 #
1352 # The only functions that should access these variables directly are:
1353 #
1354 # log_writer_start($logfilename)
1355 #     Starts an arv-put pipe, reading data on stdin and writing it to
1356 #     a $logfilename file in an output collection.
1357 #
1358 # log_writer_send($txt)
1359 #     Writes $txt to the output log collection.
1360 #
1361 # log_writer_finish()
1362 #     Closes the arv-put pipe and returns the output that it produces.
1363 #
1364 # log_writer_is_active()
1365 #     Returns a true value if there is currently a live arv-put
1366 #     process, false otherwise.
1367 #
1368 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1369
1370 sub log_writer_start($)
1371 {
1372   my $logfilename = shift;
1373   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1374                         'arv-put', '--portable-data-hash',
1375                         '--retries', '3',
1376                         '--filename', $logfilename,
1377                         '-');
1378 }
1379
1380 sub log_writer_send($)
1381 {
1382   my $txt = shift;
1383   print $log_pipe_in $txt;
1384 }
1385
1386 sub log_writer_finish()
1387 {
1388   return unless $log_pipe_pid;
1389
1390   close($log_pipe_in);
1391   my $arv_put_output;
1392
1393   my $s = IO::Select->new($log_pipe_out);
1394   if ($s->can_read(120)) {
1395     sysread($log_pipe_out, $arv_put_output, 1024);
1396     chomp($arv_put_output);
1397   } else {
1398     Log (undef, "timed out reading from 'arv-put'");
1399   }
1400
1401   waitpid($log_pipe_pid, 0);
1402   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1403   if ($?) {
1404     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1405   }
1406
1407   return $arv_put_output;
1408 }
1409
1410 sub log_writer_is_active() {
1411   return $log_pipe_pid;
1412 }
1413
1414 sub Log                         # ($jobstep_id, $logmessage)
1415 {
1416   if ($_[1] =~ /\n/) {
1417     for my $line (split (/\n/, $_[1])) {
1418       Log ($_[0], $line);
1419     }
1420     return;
1421   }
1422   my $fh = select STDERR; $|=1; select $fh;
1423   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1424   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1425   $message .= "\n";
1426   my $datetime;
1427   if (log_writer_is_active() || -t STDERR) {
1428     my @gmtime = gmtime;
1429     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1430                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1431   }
1432   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1433
1434   if (log_writer_is_active()) {
1435     log_writer_send($datetime . " " . $message);
1436   }
1437 }
1438
1439
1440 sub croak
1441 {
1442   my ($package, $file, $line) = caller;
1443   my $message = "@_ at $file line $line\n";
1444   Log (undef, $message);
1445   freeze() if @jobstep_todo;
1446   collate_output() if @jobstep_todo;
1447   cleanup() if $Job;
1448   save_meta() if log_writer_is_active();
1449   die;
1450 }
1451
1452
1453 sub cleanup
1454 {
1455   if ($Job->{'state'} eq 'Cancelled') {
1456     $Job->update_attributes('finished_at' => scalar gmtime);
1457   } else {
1458     $Job->update_attributes('state' => 'Failed');
1459   }
1460 }
1461
1462
1463 sub save_meta
1464 {
1465   my $justcheckpoint = shift; # false if this will be the last meta saved
1466   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1467
1468   my $loglocator = log_writer_finish();
1469   Log (undef, "log manifest is $loglocator");
1470   $Job->{'log'} = $loglocator;
1471   $Job->update_attributes('log', $loglocator);
1472 }
1473
1474
1475 sub freeze_if_want_freeze
1476 {
1477   if ($main::please_freeze)
1478   {
1479     release_allocation();
1480     if (@_)
1481     {
1482       # kill some srun procs before freeze+stop
1483       map { $proc{$_} = {} } @_;
1484       while (%proc)
1485       {
1486         killem (keys %proc);
1487         select (undef, undef, undef, 0.1);
1488         my $died;
1489         while (($died = waitpid (-1, WNOHANG)) > 0)
1490         {
1491           delete $proc{$died};
1492         }
1493       }
1494     }
1495     freeze();
1496     collate_output();
1497     cleanup();
1498     save_meta();
1499     exit 1;
1500   }
1501 }
1502
1503
1504 sub freeze
1505 {
1506   Log (undef, "Freeze not implemented");
1507   return;
1508 }
1509
1510
1511 sub thaw
1512 {
1513   croak ("Thaw not implemented");
1514 }
1515
1516
1517 sub freezequote
1518 {
1519   my $s = shift;
1520   $s =~ s/\\/\\\\/g;
1521   $s =~ s/\n/\\n/g;
1522   return $s;
1523 }
1524
1525
1526 sub freezeunquote
1527 {
1528   my $s = shift;
1529   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1530   return $s;
1531 }
1532
1533
1534 sub srun
1535 {
1536   my $srunargs = shift;
1537   my $execargs = shift;
1538   my $opts = shift || {};
1539   my $stdin = shift;
1540   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1541   print STDERR (join (" ",
1542                       map { / / ? "'$_'" : $_ }
1543                       (@$args)),
1544                 "\n")
1545       if $ENV{CRUNCH_DEBUG};
1546
1547   if (defined $stdin) {
1548     my $child = open STDIN, "-|";
1549     defined $child or die "no fork: $!";
1550     if ($child == 0) {
1551       print $stdin or die $!;
1552       close STDOUT or die $!;
1553       exit 0;
1554     }
1555   }
1556
1557   return system (@$args) if $opts->{fork};
1558
1559   exec @$args;
1560   warn "ENV size is ".length(join(" ",%ENV));
1561   die "exec failed: $!: @$args";
1562 }
1563
1564
1565 sub ban_node_by_slot {
1566   # Don't start any new jobsteps on this node for 60 seconds
1567   my $slotid = shift;
1568   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1569   $slot[$slotid]->{node}->{hold_count}++;
1570   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1571 }
1572
1573 sub must_lock_now
1574 {
1575   my ($lockfile, $error_message) = @_;
1576   open L, ">", $lockfile or croak("$lockfile: $!");
1577   if (!flock L, LOCK_EX|LOCK_NB) {
1578     croak("Can't lock $lockfile: $error_message\n");
1579   }
1580 }
1581
1582 sub find_docker_image {
1583   # Given a Keep locator, check to see if it contains a Docker image.
1584   # If so, return its stream name and Docker hash.
1585   # If not, return undef for both values.
1586   my $locator = shift;
1587   my ($streamname, $filename);
1588   my $image = retry_op(sub {
1589     $arv->{collections}->{get}->execute(uuid => $locator);
1590   });
1591   if ($image) {
1592     foreach my $line (split(/\n/, $image->{manifest_text})) {
1593       my @tokens = split(/\s+/, $line);
1594       next if (!@tokens);
1595       $streamname = shift(@tokens);
1596       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1597         if (defined($filename)) {
1598           return (undef, undef);  # More than one file in the Collection.
1599         } else {
1600           $filename = (split(/:/, $filedata, 3))[2];
1601         }
1602       }
1603     }
1604   }
1605   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1606     return ($streamname, $1);
1607   } else {
1608     return (undef, undef);
1609   }
1610 }
1611
1612 sub retry_count {
1613   # Calculate the number of times an operation should be retried,
1614   # assuming exponential backoff, and that we're willing to retry as
1615   # long as tasks have been running.  Enforce a minimum of 3 retries.
1616   my ($starttime, $endtime, $timediff, $retries);
1617   if (@jobstep) {
1618     $starttime = $jobstep[0]->{starttime};
1619     $endtime = $jobstep[-1]->{finishtime};
1620   }
1621   if (!defined($starttime)) {
1622     $timediff = 0;
1623   } elsif (!defined($endtime)) {
1624     $timediff = time - $starttime;
1625   } else {
1626     $timediff = ($endtime - $starttime) - (time - $endtime);
1627   }
1628   if ($timediff > 0) {
1629     $retries = int(log($timediff) / log(2));
1630   } else {
1631     $retries = 1;  # Use the minimum.
1632   }
1633   return ($retries > 3) ? $retries : 3;
1634 }
1635
1636 sub retry_op {
1637   # Given a function reference, call it with the remaining arguments.  If
1638   # it dies, retry it with exponential backoff until it succeeds, or until
1639   # the current retry_count is exhausted.
1640   my $operation = shift;
1641   my $retries = retry_count();
1642   foreach my $try_count (0..$retries) {
1643     my $next_try = time + (2 ** $try_count);
1644     my $result = eval { $operation->(@_); };
1645     if (!$@) {
1646       return $result;
1647     } elsif ($try_count < $retries) {
1648       my $sleep_time = $next_try - time;
1649       sleep($sleep_time) if ($sleep_time > 0);
1650     }
1651   }
1652   # Ensure the error message ends in a newline, so Perl doesn't add
1653   # retry_op's line number to it.
1654   chomp($@);
1655   die($@ . "\n");
1656 }
1657
1658 sub exit_status_s {
1659   # Given a $?, return a human-readable exit code string like "0" or
1660   # "1" or "0 with signal 1" or "1 with signal 11".
1661   my $exitcode = shift;
1662   my $s = $exitcode >> 8;
1663   if ($exitcode & 0x7f) {
1664     $s .= " with signal " . ($exitcode & 0x7f);
1665   }
1666   if ($exitcode & 0x80) {
1667     $s .= " with core dump";
1668   }
1669   return $s;
1670 }
1671
1672 __DATA__
1673 #!/usr/bin/perl
1674
1675 # checkout-and-build
1676
1677 use Fcntl ':flock';
1678 use File::Path qw( make_path );
1679
1680 my $destdir = $ENV{"CRUNCH_SRC"};
1681 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1682 my $repo = $ENV{"CRUNCH_SRC_URL"};
1683 my $task_work = $ENV{"TASK_WORK"};
1684
1685 for my $dir ($destdir, $task_work) {
1686     if ($dir) {
1687         make_path $dir;
1688         -e $dir or die "Failed to create temporary directory ($dir): $!";
1689     }
1690 }
1691
1692 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1693 flock L, LOCK_EX;
1694 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1695     if (@ARGV) {
1696         exec(@ARGV);
1697         die "Cannot exec `@ARGV`: $!";
1698     } else {
1699         exit 0;
1700     }
1701 }
1702
1703 unlink "$destdir.commit";
1704 open STDOUT, ">", "$destdir.log";
1705 open STDERR, ">&STDOUT";
1706
1707 mkdir $destdir;
1708 my @git_archive_data = <DATA>;
1709 if (@git_archive_data) {
1710   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1711   print TARX @git_archive_data;
1712   if(!close(TARX)) {
1713     die "'tar -C $destdir -xf -' exited $?: $!";
1714   }
1715 }
1716
1717 my $pwd;
1718 chomp ($pwd = `pwd`);
1719 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1720 mkdir $install_dir;
1721
1722 for my $src_path ("$destdir/arvados/sdk/python") {
1723   if (-d $src_path) {
1724     shell_or_die ("virtualenv", $install_dir);
1725     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1726   }
1727 }
1728
1729 if (-e "$destdir/crunch_scripts/install") {
1730     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1731 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1732     # Old version
1733     shell_or_die ("./tests/autotests.sh", $install_dir);
1734 } elsif (-e "./install.sh") {
1735     shell_or_die ("./install.sh", $install_dir);
1736 }
1737
1738 if ($commit) {
1739     unlink "$destdir.commit.new";
1740     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1741     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1742 }
1743
1744 close L;
1745
1746 if (@ARGV) {
1747     exec(@ARGV);
1748     die "Cannot exec `@ARGV`: $!";
1749 } else {
1750     exit 0;
1751 }
1752
1753 sub shell_or_die
1754 {
1755   if ($ENV{"DEBUG"}) {
1756     print STDERR "@_\n";
1757   }
1758   system (@_) == 0
1759       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1760 }
1761
1762 __DATA__