43eebf830687f01d964b1311327e88581df369b0
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Data::Dumper;
90 use Digest::MD5 qw(md5_hex);
91 use Getopt::Long;
92 use IPC::Open2;
93 use IO::Select;
94 use File::Temp;
95 use Fcntl ':flock';
96 use File::Path qw( make_path remove_tree );
97
98 use constant EX_TEMPFAIL => 75;
99
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103   if ($ENV{"USER"} ne "crunch" && $< != 0) {
104     # use a tmp dir unique for my uid
105     $ENV{"CRUNCH_TMP"} .= "-$<";
106   }
107 }
108
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
112 }
113
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
118
119 my $force_unlock;
120 my $git_dir;
121 my $jobspec;
122 my $job_api_token;
123 my $no_clear_tmp;
124 my $resume_stash;
125 GetOptions('force-unlock' => \$force_unlock,
126            'git-dir=s' => \$git_dir,
127            'job=s' => \$jobspec,
128            'job-api-token=s' => \$job_api_token,
129            'no-clear-tmp' => \$no_clear_tmp,
130            'resume-stash=s' => \$resume_stash,
131     );
132
133 if (defined $job_api_token) {
134   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
135 }
136
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
138 my $local_job = 0;
139
140
141 $SIG{'USR1'} = sub
142 {
143   $main::ENV{CRUNCH_DEBUG} = 1;
144 };
145 $SIG{'USR2'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 0;
148 };
149
150
151
152 my $arv = Arvados->new('apiVersion' => 'v1');
153
154 my $Job;
155 my $job_id;
156 my $dbh;
157 my $sth;
158 my @jobstep;
159
160 my $User = api_call("users/current");
161
162 if ($jobspec =~ /^[-a-z\d]+$/)
163 {
164   # $jobspec is an Arvados UUID, not a JSON job specification
165   $Job = api_call("jobs/get", uuid => $jobspec);
166   if (!$force_unlock) {
167     # Claim this job, and make sure nobody else does
168     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = api_call("jobs/create", job => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
306 my $squeue_checked;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
310
311
312
313 if (defined $Job->{thawedfromkey})
314 {
315   thaw ($Job->{thawedfromkey});
316 }
317 else
318 {
319   my $first_task = api_call("job_tasks/create", job_task => {
320     'job_uuid' => $Job->{'uuid'},
321     'sequence' => 0,
322     'qsequence' => 0,
323     'parameters' => {},
324   });
325   push @jobstep, { 'level' => 0,
326                    'failures' => 0,
327                    'arvados_task' => $first_task,
328                  };
329   push @jobstep_todo, 0;
330 }
331
332
333 if (!$have_slurm)
334 {
335   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
336 }
337
338
339 my $build_script;
340 do {
341   local $/ = undef;
342   $build_script = <DATA>;
343 };
344 my $nodelist = join(",", @node);
345
346 if (!defined $no_clear_tmp) {
347   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
348   Log (undef, "Clean work dirs");
349
350   my $cleanpid = fork();
351   if ($cleanpid == 0)
352   {
353     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
354           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
355     exit (1);
356   }
357   while (1)
358   {
359     last if $cleanpid == waitpid (-1, WNOHANG);
360     freeze_if_want_freeze ($cleanpid);
361     select (undef, undef, undef, 0.1);
362   }
363   Log (undef, "Cleanup command exited ".exit_status_s($?));
364 }
365
366
367 my $git_archive;
368 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
369   # If script_version looks like an absolute path, *and* the --git-dir
370   # argument was not given -- which implies we were not invoked by
371   # crunch-dispatch -- we will use the given path as a working
372   # directory instead of resolving script_version to a git commit (or
373   # doing anything else with git).
374   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
375   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
376 }
377 else {
378   # Resolve the given script_version to a git commit sha1. Also, if
379   # the repository is remote, clone it into our local filesystem: this
380   # ensures "git archive" will work, and is necessary to reliably
381   # resolve a symbolic script_version like "master^".
382   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
383
384   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
385
386   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
387
388   # If we're running under crunch-dispatch, it will have already
389   # pulled the appropriate source tree into its own repository, and
390   # given us that repo's path as $git_dir.
391   #
392   # If we're running a "local" job, we might have to fetch content
393   # from a remote repository.
394   #
395   # (Currently crunch-dispatch gives a local path with --git-dir, but
396   # we might as well accept URLs there too in case it changes its
397   # mind.)
398   my $repo = $git_dir || $Job->{'repository'};
399
400   # Repository can be remote or local. If remote, we'll need to fetch it
401   # to a local dir before doing `git log` et al.
402   my $repo_location;
403
404   if ($repo =~ m{://|^[^/]*:}) {
405     # $repo is a git url we can clone, like git:// or https:// or
406     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
407     # not recognized here because distinguishing that from a local
408     # path is too fragile. If you really need something strange here,
409     # use the ssh:// form.
410     $repo_location = 'remote';
411   } elsif ($repo =~ m{^\.*/}) {
412     # $repo is a local path to a git index. We'll also resolve ../foo
413     # to ../foo/.git if the latter is a directory. To help
414     # disambiguate local paths from named hosted repositories, this
415     # form must be given as ./ or ../ if it's a relative path.
416     if (-d "$repo/.git") {
417       $repo = "$repo/.git";
418     }
419     $repo_location = 'local';
420   } else {
421     # $repo is none of the above. It must be the name of a hosted
422     # repository.
423     my $arv_repo_list = api_call("repositories/list",
424                                  'filters' => [['name','=',$repo]]);
425     my @repos_found = @{$arv_repo_list->{'items'}};
426     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
427     if ($n_found > 0) {
428       Log(undef, "Repository '$repo' -> "
429           . join(", ", map { $_->{'uuid'} } @repos_found));
430     }
431     if ($n_found != 1) {
432       croak("Error: Found $n_found repositories with name '$repo'.");
433     }
434     $repo = $repos_found[0]->{'fetch_url'};
435     $repo_location = 'remote';
436   }
437   Log(undef, "Using $repo_location repository '$repo'");
438   $ENV{"CRUNCH_SRC_URL"} = $repo;
439
440   # Resolve given script_version (we'll call that $treeish here) to a
441   # commit sha1 ($commit).
442   my $treeish = $Job->{'script_version'};
443   my $commit;
444   if ($repo_location eq 'remote') {
445     # We minimize excess object-fetching by re-using the same bare
446     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
447     # just keep adding remotes to it as needed.
448     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
449     my $gitcmd = "git --git-dir=\Q$local_repo\E";
450
451     # Set up our local repo for caching remote objects, making
452     # archives, etc.
453     if (!-d $local_repo) {
454       make_path($local_repo) or croak("Error: could not create $local_repo");
455     }
456     # This works (exits 0 and doesn't delete fetched objects) even
457     # if $local_repo is already initialized:
458     `$gitcmd init --bare`;
459     if ($?) {
460       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
461     }
462
463     # If $treeish looks like a hash (or abbrev hash) we look it up in
464     # our local cache first, since that's cheaper. (We don't want to
465     # do that with tags/branches though -- those change over time, so
466     # they should always be resolved by the remote repo.)
467     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
468       # Hide stderr because it's normal for this to fail:
469       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
470       if ($? == 0 &&
471           # Careful not to resolve a branch named abcdeff to commit 1234567:
472           $sha1 =~ /^$treeish/ &&
473           $sha1 =~ /^([0-9a-f]{40})$/s) {
474         $commit = $1;
475         Log(undef, "Commit $commit already present in $local_repo");
476       }
477     }
478
479     if (!defined $commit) {
480       # If $treeish isn't just a hash or abbrev hash, or isn't here
481       # yet, we need to fetch the remote to resolve it correctly.
482
483       # First, remove all local heads. This prevents a name that does
484       # not exist on the remote from resolving to (or colliding with)
485       # a previously fetched branch or tag (possibly from a different
486       # remote).
487       remove_tree("$local_repo/refs/heads", {keep_root => 1});
488
489       Log(undef, "Fetching objects from $repo to $local_repo");
490       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
491       if ($?) {
492         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
493       }
494     }
495
496     # Now that the data is all here, we will use our local repo for
497     # the rest of our git activities.
498     $repo = $local_repo;
499   }
500
501   my $gitcmd = "git --git-dir=\Q$repo\E";
502   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
503   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
504     croak("`$gitcmd rev-list` exited "
505           .exit_status_s($?)
506           .", '$treeish' not found. Giving up.");
507   }
508   $commit = $1;
509   Log(undef, "Version $treeish is commit $commit");
510
511   if ($commit ne $Job->{'script_version'}) {
512     # Record the real commit id in the database, frozentokey, logs,
513     # etc. -- instead of an abbreviation or a branch name which can
514     # become ambiguous or point to a different commit in the future.
515     if (!$Job->update_attributes('script_version' => $commit)) {
516       croak("Error: failed to update job's script_version attribute");
517     }
518   }
519
520   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
521   $git_archive = `$gitcmd archive ''\Q$commit\E`;
522   if ($?) {
523     croak("Error: $gitcmd archive exited ".exit_status_s($?));
524   }
525 }
526
527 if (!defined $git_archive) {
528   Log(undef, "Skip install phase (no git archive)");
529   if ($have_slurm) {
530     Log(undef, "Warning: This probably means workers have no source tree!");
531   }
532 }
533 else {
534   Log(undef, "Run install script on all workers");
535
536   my @srunargs = ("srun",
537                   "--nodelist=$nodelist",
538                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
539   my @execargs = ("sh", "-c",
540                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
541
542   my $installpid = fork();
543   if ($installpid == 0)
544   {
545     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
546     exit (1);
547   }
548   while (1)
549   {
550     last if $installpid == waitpid (-1, WNOHANG);
551     freeze_if_want_freeze ($installpid);
552     select (undef, undef, undef, 0.1);
553   }
554   Log (undef, "Install script exited ".exit_status_s($?));
555 }
556
557 if (!$have_slurm)
558 {
559   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
560   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
561 }
562
563 # If this job requires a Docker image, install that.
564 my $docker_bin = "/usr/bin/docker.io";
565 my ($docker_locator, $docker_stream, $docker_hash);
566 if ($docker_locator = $Job->{docker_image_locator}) {
567   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
568   if (!$docker_hash)
569   {
570     croak("No Docker image hash found from locator $docker_locator");
571   }
572   $docker_stream =~ s/^\.//;
573   my $docker_install_script = qq{
574 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
575     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
576 fi
577 };
578   my $docker_pid = fork();
579   if ($docker_pid == 0)
580   {
581     srun (["srun", "--nodelist=" . join(',', @node)],
582           ["/bin/sh", "-ec", $docker_install_script]);
583     exit ($?);
584   }
585   while (1)
586   {
587     last if $docker_pid == waitpid (-1, WNOHANG);
588     freeze_if_want_freeze ($docker_pid);
589     select (undef, undef, undef, 0.1);
590   }
591   if ($? != 0)
592   {
593     croak("Installing Docker image from $docker_locator exited "
594           .exit_status_s($?));
595   }
596 }
597
598 foreach (qw (script script_version script_parameters runtime_constraints))
599 {
600   Log (undef,
601        "$_ " .
602        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
603 }
604 foreach (split (/\n/, $Job->{knobs}))
605 {
606   Log (undef, "knob " . $_);
607 }
608
609
610
611 $main::success = undef;
612
613
614
615 ONELEVEL:
616
617 my $thisround_succeeded = 0;
618 my $thisround_failed = 0;
619 my $thisround_failed_multiple = 0;
620
621 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
622                        or $a <=> $b } @jobstep_todo;
623 my $level = $jobstep[$jobstep_todo[0]]->{level};
624 Log (undef, "start level $level");
625
626
627
628 my %proc;
629 my @freeslot = (0..$#slot);
630 my @holdslot;
631 my %reader;
632 my $progress_is_dirty = 1;
633 my $progress_stats_updated = 0;
634
635 update_progress_stats();
636
637
638
639 THISROUND:
640 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
641 {
642   my $id = $jobstep_todo[$todo_ptr];
643   my $Jobstep = $jobstep[$id];
644   if ($Jobstep->{level} != $level)
645   {
646     next;
647   }
648
649   pipe $reader{$id}, "writer" or croak ($!);
650   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
651   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
652
653   my $childslot = $freeslot[0];
654   my $childnode = $slot[$childslot]->{node};
655   my $childslotname = join (".",
656                             $slot[$childslot]->{node}->{name},
657                             $slot[$childslot]->{cpu});
658   my $childpid = fork();
659   if ($childpid == 0)
660   {
661     $SIG{'INT'} = 'DEFAULT';
662     $SIG{'QUIT'} = 'DEFAULT';
663     $SIG{'TERM'} = 'DEFAULT';
664
665     foreach (values (%reader))
666     {
667       close($_);
668     }
669     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
670     open(STDOUT,">&writer");
671     open(STDERR,">&writer");
672
673     undef $dbh;
674     undef $sth;
675
676     delete $ENV{"GNUPGHOME"};
677     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
678     $ENV{"TASK_QSEQUENCE"} = $id;
679     $ENV{"TASK_SEQUENCE"} = $level;
680     $ENV{"JOB_SCRIPT"} = $Job->{script};
681     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
682       $param =~ tr/a-z/A-Z/;
683       $ENV{"JOB_PARAMETER_$param"} = $value;
684     }
685     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
686     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
687     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
688     $ENV{"HOME"} = $ENV{"TASK_WORK"};
689     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
690     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
691     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
692     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
693
694     $ENV{"GZIP"} = "-n";
695
696     my @srunargs = (
697       "srun",
698       "--nodelist=".$childnode->{name},
699       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
700       "--job-name=$job_id.$id.$$",
701         );
702     my $build_script_to_send = "";
703     my $command =
704         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
705         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
706         ."&& cd $ENV{CRUNCH_TMP} ";
707     if ($build_script)
708     {
709       $build_script_to_send = $build_script;
710       $command .=
711           "&& perl -";
712     }
713     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
714     if ($docker_hash)
715     {
716       my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
717       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
718       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
719
720       # Dynamically configure the container to use the host system as its
721       # DNS server.  Get the host's global addresses from the ip command,
722       # and turn them into docker --dns options using gawk.
723       $command .=
724           q{$(ip -o address show scope global |
725               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
726
727       # The source tree and $destdir directory (which we have
728       # installed on the worker host) are available in the container,
729       # under the same path.
730       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
731       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
732
733       # Currently, we make arv-mount's mount point appear at /keep
734       # inside the container (instead of using the same path as the
735       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
736       # crunch scripts and utilities must not rely on this. They must
737       # use $TASK_KEEPMOUNT.
738       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
739       $ENV{TASK_KEEPMOUNT} = "/keep";
740
741       # TASK_WORK is a plain docker data volume: it starts out empty,
742       # is writable, and persists until no containers use it any
743       # more. We don't use --volumes-from to share it with other
744       # containers: it is only accessible to this task, and it goes
745       # away when this task stops.
746       $command .= "--volume=\Q$ENV{TASK_WORK}\E ";
747
748       # JOB_WORK is also a plain docker data volume for now. TODO:
749       # Share a single JOB_WORK volume across all task containers on a
750       # given worker node, and delete it when the job ends (and, in
751       # case that doesn't work, when the next job starts).
752       $command .= "--volume=\Q$ENV{JOB_WORK}\E ";
753
754       while (my ($env_key, $env_val) = each %ENV)
755       {
756         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
757           $command .= "--env=\Q$env_key=$env_val\E ";
758         }
759       }
760       $command .= "--env=\QHOME=$ENV{HOME}\E ";
761       $command .= "\Q$docker_hash\E ";
762       $command .= "stdbuf --output=0 --error=0 ";
763       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
764     } else {
765       # Non-docker run
766       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
767       $command .= "stdbuf --output=0 --error=0 ";
768       $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
769     }
770
771     my @execargs = ('bash', '-c', $command);
772     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
773     # exec() failed, we assume nothing happened.
774     die "srun() failed on build script\n";
775   }
776   close("writer");
777   if (!defined $childpid)
778   {
779     close $reader{$id};
780     delete $reader{$id};
781     next;
782   }
783   shift @freeslot;
784   $proc{$childpid} = { jobstep => $id,
785                        time => time,
786                        slot => $childslot,
787                        jobstepname => "$job_id.$id.$childpid",
788                      };
789   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
790   $slot[$childslot]->{pid} = $childpid;
791
792   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
793   Log ($id, "child $childpid started on $childslotname");
794   $Jobstep->{starttime} = time;
795   $Jobstep->{node} = $childnode->{name};
796   $Jobstep->{slotindex} = $childslot;
797   delete $Jobstep->{stderr};
798   delete $Jobstep->{finishtime};
799
800   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
801   $Jobstep->{'arvados_task'}->save;
802
803   splice @jobstep_todo, $todo_ptr, 1;
804   --$todo_ptr;
805
806   $progress_is_dirty = 1;
807
808   while (!@freeslot
809          ||
810          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
811   {
812     last THISROUND if $main::please_freeze;
813     if ($main::please_info)
814     {
815       $main::please_info = 0;
816       freeze();
817       collate_output();
818       save_meta(1);
819       update_progress_stats();
820     }
821     my $gotsome
822         = readfrompipes ()
823         + reapchildren ();
824     if (!$gotsome)
825     {
826       check_refresh_wanted();
827       check_squeue();
828       update_progress_stats();
829       select (undef, undef, undef, 0.1);
830     }
831     elsif (time - $progress_stats_updated >= 30)
832     {
833       update_progress_stats();
834     }
835     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
836         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
837     {
838       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
839           .($thisround_failed+$thisround_succeeded)
840           .") -- giving up on this round";
841       Log (undef, $message);
842       last THISROUND;
843     }
844
845     # move slots from freeslot to holdslot (or back to freeslot) if necessary
846     for (my $i=$#freeslot; $i>=0; $i--) {
847       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
848         push @holdslot, (splice @freeslot, $i, 1);
849       }
850     }
851     for (my $i=$#holdslot; $i>=0; $i--) {
852       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
853         push @freeslot, (splice @holdslot, $i, 1);
854       }
855     }
856
857     # give up if no nodes are succeeding
858     if (!grep { $_->{node}->{losing_streak} == 0 &&
859                     $_->{node}->{hold_count} < 4 } @slot) {
860       my $message = "Every node has failed -- giving up on this round";
861       Log (undef, $message);
862       last THISROUND;
863     }
864   }
865 }
866
867
868 push @freeslot, splice @holdslot;
869 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
870
871
872 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
873 while (%proc)
874 {
875   if ($main::please_continue) {
876     $main::please_continue = 0;
877     goto THISROUND;
878   }
879   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
880   readfrompipes ();
881   if (!reapchildren())
882   {
883     check_refresh_wanted();
884     check_squeue();
885     update_progress_stats();
886     select (undef, undef, undef, 0.1);
887     killem (keys %proc) if $main::please_freeze;
888   }
889 }
890
891 update_progress_stats();
892 freeze_if_want_freeze();
893
894
895 if (!defined $main::success)
896 {
897   if (@jobstep_todo &&
898       $thisround_succeeded == 0 &&
899       ($thisround_failed == 0 || $thisround_failed > 4))
900   {
901     my $message = "stop because $thisround_failed tasks failed and none succeeded";
902     Log (undef, $message);
903     $main::success = 0;
904   }
905   if (!@jobstep_todo)
906   {
907     $main::success = 1;
908   }
909 }
910
911 goto ONELEVEL if !defined $main::success;
912
913
914 release_allocation();
915 freeze();
916 my $collated_output = &collate_output();
917
918 if (!$collated_output) {
919   Log(undef, "output undef");
920 }
921 else {
922   eval {
923     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
924         or die "failed to get collated manifest: $!";
925     my $orig_manifest_text = '';
926     while (my $manifest_line = <$orig_manifest>) {
927       $orig_manifest_text .= $manifest_line;
928     }
929     my $output = api_call("collections/create", collection => {
930       'manifest_text' => $orig_manifest_text});
931     Log(undef, "output uuid " . $output->{uuid});
932     Log(undef, "output hash " . $output->{portable_data_hash});
933     $Job->update_attributes('output' => $output->{portable_data_hash});
934   };
935   if ($@) {
936     Log (undef, "Failed to register output manifest: $@");
937   }
938 }
939
940 Log (undef, "finish");
941
942 save_meta();
943
944 my $final_state;
945 if ($collated_output && $main::success) {
946   $final_state = 'Complete';
947 } else {
948   $final_state = 'Failed';
949 }
950 $Job->update_attributes('state' => $final_state);
951
952 exit (($final_state eq 'Complete') ? 0 : 1);
953
954
955
956 sub update_progress_stats
957 {
958   $progress_stats_updated = time;
959   return if !$progress_is_dirty;
960   my ($todo, $done, $running) = (scalar @jobstep_todo,
961                                  scalar @jobstep_done,
962                                  scalar @slot - scalar @freeslot - scalar @holdslot);
963   $Job->{'tasks_summary'} ||= {};
964   $Job->{'tasks_summary'}->{'todo'} = $todo;
965   $Job->{'tasks_summary'}->{'done'} = $done;
966   $Job->{'tasks_summary'}->{'running'} = $running;
967   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
968   Log (undef, "status: $done done, $running running, $todo todo");
969   $progress_is_dirty = 0;
970 }
971
972
973
974 sub reapchildren
975 {
976   my $pid = waitpid (-1, WNOHANG);
977   return 0 if $pid <= 0;
978
979   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
980                   . "."
981                   . $slot[$proc{$pid}->{slot}]->{cpu});
982   my $jobstepid = $proc{$pid}->{jobstep};
983   my $elapsed = time - $proc{$pid}->{time};
984   my $Jobstep = $jobstep[$jobstepid];
985
986   my $childstatus = $?;
987   my $exitvalue = $childstatus >> 8;
988   my $exitinfo = "exit ".exit_status_s($childstatus);
989   $Jobstep->{'arvados_task'}->reload;
990   my $task_success = $Jobstep->{'arvados_task'}->{success};
991
992   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
993
994   if (!defined $task_success) {
995     # task did not indicate one way or the other --> fail
996     $Jobstep->{'arvados_task'}->{success} = 0;
997     $Jobstep->{'arvados_task'}->save;
998     $task_success = 0;
999   }
1000
1001   if (!$task_success)
1002   {
1003     my $temporary_fail;
1004     $temporary_fail ||= $Jobstep->{node_fail};
1005     $temporary_fail ||= ($exitvalue == 111);
1006
1007     ++$thisround_failed;
1008     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1009
1010     # Check for signs of a failed or misconfigured node
1011     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1012         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1013       # Don't count this against jobstep failure thresholds if this
1014       # node is already suspected faulty and srun exited quickly
1015       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1016           $elapsed < 5) {
1017         Log ($jobstepid, "blaming failure on suspect node " .
1018              $slot[$proc{$pid}->{slot}]->{node}->{name});
1019         $temporary_fail ||= 1;
1020       }
1021       ban_node_by_slot($proc{$pid}->{slot});
1022     }
1023
1024     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1025                              ++$Jobstep->{'failures'},
1026                              $temporary_fail ? 'temporary ' : 'permanent',
1027                              $elapsed));
1028
1029     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1030       # Give up on this task, and the whole job
1031       $main::success = 0;
1032       $main::please_freeze = 1;
1033     }
1034     # Put this task back on the todo queue
1035     push @jobstep_todo, $jobstepid;
1036     $Job->{'tasks_summary'}->{'failed'}++;
1037   }
1038   else
1039   {
1040     ++$thisround_succeeded;
1041     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1042     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1043     push @jobstep_done, $jobstepid;
1044     Log ($jobstepid, "success in $elapsed seconds");
1045   }
1046   $Jobstep->{exitcode} = $childstatus;
1047   $Jobstep->{finishtime} = time;
1048   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1049   $Jobstep->{'arvados_task'}->save;
1050   process_stderr ($jobstepid, $task_success);
1051   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1052
1053   close $reader{$jobstepid};
1054   delete $reader{$jobstepid};
1055   delete $slot[$proc{$pid}->{slot}]->{pid};
1056   push @freeslot, $proc{$pid}->{slot};
1057   delete $proc{$pid};
1058
1059   if ($task_success) {
1060     # Load new tasks
1061     my $newtask_list = [];
1062     my $newtask_results;
1063     do {
1064       $newtask_results = api_call(
1065         "job_tasks/list",
1066         'where' => {
1067           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1068         },
1069         'order' => 'qsequence',
1070         'offset' => scalar(@$newtask_list),
1071       );
1072       push(@$newtask_list, @{$newtask_results->{items}});
1073     } while (@{$newtask_results->{items}});
1074     foreach my $arvados_task (@$newtask_list) {
1075       my $jobstep = {
1076         'level' => $arvados_task->{'sequence'},
1077         'failures' => 0,
1078         'arvados_task' => $arvados_task
1079       };
1080       push @jobstep, $jobstep;
1081       push @jobstep_todo, $#jobstep;
1082     }
1083   }
1084
1085   $progress_is_dirty = 1;
1086   1;
1087 }
1088
1089 sub check_refresh_wanted
1090 {
1091   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1092   if (@stat && $stat[9] > $latest_refresh) {
1093     $latest_refresh = scalar time;
1094     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1095     for my $attr ('cancelled_at',
1096                   'cancelled_by_user_uuid',
1097                   'cancelled_by_client_uuid',
1098                   'state') {
1099       $Job->{$attr} = $Job2->{$attr};
1100     }
1101     if ($Job->{'state'} ne "Running") {
1102       if ($Job->{'state'} eq "Cancelled") {
1103         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1104       } else {
1105         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1106       }
1107       $main::success = 0;
1108       $main::please_freeze = 1;
1109     }
1110   }
1111 }
1112
1113 sub check_squeue
1114 {
1115   # return if the kill list was checked <4 seconds ago
1116   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1117   {
1118     return;
1119   }
1120   $squeue_kill_checked = time;
1121
1122   # use killem() on procs whose killtime is reached
1123   for (keys %proc)
1124   {
1125     if (exists $proc{$_}->{killtime}
1126         && $proc{$_}->{killtime} <= time)
1127     {
1128       killem ($_);
1129     }
1130   }
1131
1132   # return if the squeue was checked <60 seconds ago
1133   if (defined $squeue_checked && $squeue_checked > time - 60)
1134   {
1135     return;
1136   }
1137   $squeue_checked = time;
1138
1139   if (!$have_slurm)
1140   {
1141     # here is an opportunity to check for mysterious problems with local procs
1142     return;
1143   }
1144
1145   # get a list of steps still running
1146   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1147   chop @squeue;
1148   if ($squeue[-1] ne "ok")
1149   {
1150     return;
1151   }
1152   pop @squeue;
1153
1154   # which of my jobsteps are running, according to squeue?
1155   my %ok;
1156   foreach (@squeue)
1157   {
1158     if (/^(\d+)\.(\d+) (\S+)/)
1159     {
1160       if ($1 eq $ENV{SLURM_JOBID})
1161       {
1162         $ok{$3} = 1;
1163       }
1164     }
1165   }
1166
1167   # which of my active child procs (>60s old) were not mentioned by squeue?
1168   foreach (keys %proc)
1169   {
1170     if ($proc{$_}->{time} < time - 60
1171         && !exists $ok{$proc{$_}->{jobstepname}}
1172         && !exists $proc{$_}->{killtime})
1173     {
1174       # kill this proc if it hasn't exited in 30 seconds
1175       $proc{$_}->{killtime} = time + 30;
1176     }
1177   }
1178 }
1179
1180
1181 sub release_allocation
1182 {
1183   if ($have_slurm)
1184   {
1185     Log (undef, "release job allocation");
1186     system "scancel $ENV{SLURM_JOBID}";
1187   }
1188 }
1189
1190
1191 sub readfrompipes
1192 {
1193   my $gotsome = 0;
1194   foreach my $job (keys %reader)
1195   {
1196     my $buf;
1197     while (0 < sysread ($reader{$job}, $buf, 8192))
1198     {
1199       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1200       $jobstep[$job]->{stderr} .= $buf;
1201       preprocess_stderr ($job);
1202       if (length ($jobstep[$job]->{stderr}) > 16384)
1203       {
1204         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1205       }
1206       $gotsome = 1;
1207     }
1208   }
1209   return $gotsome;
1210 }
1211
1212
1213 sub preprocess_stderr
1214 {
1215   my $job = shift;
1216
1217   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1218     my $line = $1;
1219     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1220     Log ($job, "stderr $line");
1221     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1222       # whoa.
1223       $main::please_freeze = 1;
1224     }
1225     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1226       $jobstep[$job]->{node_fail} = 1;
1227       ban_node_by_slot($jobstep[$job]->{slotindex});
1228     }
1229   }
1230 }
1231
1232
1233 sub process_stderr
1234 {
1235   my $job = shift;
1236   my $task_success = shift;
1237   preprocess_stderr ($job);
1238
1239   map {
1240     Log ($job, "stderr $_");
1241   } split ("\n", $jobstep[$job]->{stderr});
1242 }
1243
1244 sub fetch_block
1245 {
1246   my $hash = shift;
1247   my ($keep, $child_out, $output_block);
1248
1249   my $cmd = "arv-get \Q$hash\E";
1250   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1251   $output_block = '';
1252   while (1) {
1253     my $buf;
1254     my $bytes = sysread($keep, $buf, 1024 * 1024);
1255     if (!defined $bytes) {
1256       die "reading from arv-get: $!";
1257     } elsif ($bytes == 0) {
1258       # sysread returns 0 at the end of the pipe.
1259       last;
1260     } else {
1261       # some bytes were read into buf.
1262       $output_block .= $buf;
1263     }
1264   }
1265   close $keep;
1266   return $output_block;
1267 }
1268
1269 sub collate_output
1270 {
1271   Log (undef, "collate");
1272
1273   my ($child_out, $child_in);
1274   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1275                   '--retries', retry_count());
1276   my $joboutput;
1277   for (@jobstep)
1278   {
1279     next if (!exists $_->{'arvados_task'}->{'output'} ||
1280              !$_->{'arvados_task'}->{'success'});
1281     my $output = $_->{'arvados_task'}->{output};
1282     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1283     {
1284       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1285       print $child_in $output;
1286     }
1287     elsif (@jobstep == 1)
1288     {
1289       $joboutput = $output;
1290       last;
1291     }
1292     elsif (defined (my $outblock = fetch_block ($output)))
1293     {
1294       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1295       print $child_in $outblock;
1296     }
1297     else
1298     {
1299       Log (undef, "XXX fetch_block($output) failed XXX");
1300       $main::success = 0;
1301     }
1302   }
1303   $child_in->close;
1304
1305   if (!defined $joboutput) {
1306     my $s = IO::Select->new($child_out);
1307     if ($s->can_read(120)) {
1308       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1309       chomp($joboutput);
1310       # TODO: Ensure exit status == 0.
1311     } else {
1312       Log (undef, "timed out reading from 'arv-put'");
1313     }
1314   }
1315   # TODO: kill $pid instead of waiting, now that we've decided to
1316   # ignore further output.
1317   waitpid($pid, 0);
1318
1319   return $joboutput;
1320 }
1321
1322
1323 sub killem
1324 {
1325   foreach (@_)
1326   {
1327     my $sig = 2;                # SIGINT first
1328     if (exists $proc{$_}->{"sent_$sig"} &&
1329         time - $proc{$_}->{"sent_$sig"} > 4)
1330     {
1331       $sig = 15;                # SIGTERM if SIGINT doesn't work
1332     }
1333     if (exists $proc{$_}->{"sent_$sig"} &&
1334         time - $proc{$_}->{"sent_$sig"} > 4)
1335     {
1336       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1337     }
1338     if (!exists $proc{$_}->{"sent_$sig"})
1339     {
1340       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1341       kill $sig, $_;
1342       select (undef, undef, undef, 0.1);
1343       if ($sig == 2)
1344       {
1345         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1346       }
1347       $proc{$_}->{"sent_$sig"} = time;
1348       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1349     }
1350   }
1351 }
1352
1353
1354 sub fhbits
1355 {
1356   my($bits);
1357   for (@_) {
1358     vec($bits,fileno($_),1) = 1;
1359   }
1360   $bits;
1361 }
1362
1363
1364 # Send log output to Keep via arv-put.
1365 #
1366 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1367 # $log_pipe_pid is the pid of the arv-put subprocess.
1368 #
1369 # The only functions that should access these variables directly are:
1370 #
1371 # log_writer_start($logfilename)
1372 #     Starts an arv-put pipe, reading data on stdin and writing it to
1373 #     a $logfilename file in an output collection.
1374 #
1375 # log_writer_send($txt)
1376 #     Writes $txt to the output log collection.
1377 #
1378 # log_writer_finish()
1379 #     Closes the arv-put pipe and returns the output that it produces.
1380 #
1381 # log_writer_is_active()
1382 #     Returns a true value if there is currently a live arv-put
1383 #     process, false otherwise.
1384 #
1385 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1386
1387 sub log_writer_start($)
1388 {
1389   my $logfilename = shift;
1390   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1391                         'arv-put', '--portable-data-hash',
1392                         '--retries', '3',
1393                         '--filename', $logfilename,
1394                         '-');
1395 }
1396
1397 sub log_writer_send($)
1398 {
1399   my $txt = shift;
1400   print $log_pipe_in $txt;
1401 }
1402
1403 sub log_writer_finish()
1404 {
1405   return unless $log_pipe_pid;
1406
1407   close($log_pipe_in);
1408   my $arv_put_output;
1409
1410   my $s = IO::Select->new($log_pipe_out);
1411   if ($s->can_read(120)) {
1412     sysread($log_pipe_out, $arv_put_output, 1024);
1413     chomp($arv_put_output);
1414   } else {
1415     Log (undef, "timed out reading from 'arv-put'");
1416   }
1417
1418   waitpid($log_pipe_pid, 0);
1419   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1420   if ($?) {
1421     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1422   }
1423
1424   return $arv_put_output;
1425 }
1426
1427 sub log_writer_is_active() {
1428   return $log_pipe_pid;
1429 }
1430
1431 sub Log                         # ($jobstep_id, $logmessage)
1432 {
1433   if ($_[1] =~ /\n/) {
1434     for my $line (split (/\n/, $_[1])) {
1435       Log ($_[0], $line);
1436     }
1437     return;
1438   }
1439   my $fh = select STDERR; $|=1; select $fh;
1440   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1441   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1442   $message .= "\n";
1443   my $datetime;
1444   if (log_writer_is_active() || -t STDERR) {
1445     my @gmtime = gmtime;
1446     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1447                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1448   }
1449   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1450
1451   if (log_writer_is_active()) {
1452     log_writer_send($datetime . " " . $message);
1453   }
1454 }
1455
1456
1457 sub croak
1458 {
1459   my ($package, $file, $line) = caller;
1460   my $message = "@_ at $file line $line\n";
1461   Log (undef, $message);
1462   freeze() if @jobstep_todo;
1463   collate_output() if @jobstep_todo;
1464   cleanup();
1465   save_meta();
1466   die;
1467 }
1468
1469
1470 sub cleanup
1471 {
1472   return unless $Job;
1473   if ($Job->{'state'} eq 'Cancelled') {
1474     $Job->update_attributes('finished_at' => scalar gmtime);
1475   } else {
1476     $Job->update_attributes('state' => 'Failed');
1477   }
1478 }
1479
1480
1481 sub save_meta
1482 {
1483   my $justcheckpoint = shift; # false if this will be the last meta saved
1484   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1485   return unless log_writer_is_active();
1486
1487   my $loglocator = log_writer_finish();
1488   Log (undef, "log manifest is $loglocator");
1489   $Job->{'log'} = $loglocator;
1490   $Job->update_attributes('log', $loglocator);
1491 }
1492
1493
1494 sub freeze_if_want_freeze
1495 {
1496   if ($main::please_freeze)
1497   {
1498     release_allocation();
1499     if (@_)
1500     {
1501       # kill some srun procs before freeze+stop
1502       map { $proc{$_} = {} } @_;
1503       while (%proc)
1504       {
1505         killem (keys %proc);
1506         select (undef, undef, undef, 0.1);
1507         my $died;
1508         while (($died = waitpid (-1, WNOHANG)) > 0)
1509         {
1510           delete $proc{$died};
1511         }
1512       }
1513     }
1514     freeze();
1515     collate_output();
1516     cleanup();
1517     save_meta();
1518     exit 1;
1519   }
1520 }
1521
1522
1523 sub freeze
1524 {
1525   Log (undef, "Freeze not implemented");
1526   return;
1527 }
1528
1529
1530 sub thaw
1531 {
1532   croak ("Thaw not implemented");
1533 }
1534
1535
1536 sub freezequote
1537 {
1538   my $s = shift;
1539   $s =~ s/\\/\\\\/g;
1540   $s =~ s/\n/\\n/g;
1541   return $s;
1542 }
1543
1544
1545 sub freezeunquote
1546 {
1547   my $s = shift;
1548   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1549   return $s;
1550 }
1551
1552
1553 sub srun
1554 {
1555   my $srunargs = shift;
1556   my $execargs = shift;
1557   my $opts = shift || {};
1558   my $stdin = shift;
1559   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1560
1561   $Data::Dumper::Terse = 1;
1562   $Data::Dumper::Indent = 0;
1563   my $show_cmd = Dumper($args);
1564   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1565   $show_cmd =~ s/\n/ /g;
1566   warn "starting: $show_cmd\n";
1567
1568   if (defined $stdin) {
1569     my $child = open STDIN, "-|";
1570     defined $child or die "no fork: $!";
1571     if ($child == 0) {
1572       print $stdin or die $!;
1573       close STDOUT or die $!;
1574       exit 0;
1575     }
1576   }
1577
1578   return system (@$args) if $opts->{fork};
1579
1580   exec @$args;
1581   warn "ENV size is ".length(join(" ",%ENV));
1582   die "exec failed: $!: @$args";
1583 }
1584
1585
1586 sub ban_node_by_slot {
1587   # Don't start any new jobsteps on this node for 60 seconds
1588   my $slotid = shift;
1589   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1590   $slot[$slotid]->{node}->{hold_count}++;
1591   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1592 }
1593
1594 sub must_lock_now
1595 {
1596   my ($lockfile, $error_message) = @_;
1597   open L, ">", $lockfile or croak("$lockfile: $!");
1598   if (!flock L, LOCK_EX|LOCK_NB) {
1599     croak("Can't lock $lockfile: $error_message\n");
1600   }
1601 }
1602
1603 sub find_docker_image {
1604   # Given a Keep locator, check to see if it contains a Docker image.
1605   # If so, return its stream name and Docker hash.
1606   # If not, return undef for both values.
1607   my $locator = shift;
1608   my ($streamname, $filename);
1609   my $image = api_call("collections/get", uuid => $locator);
1610   if ($image) {
1611     foreach my $line (split(/\n/, $image->{manifest_text})) {
1612       my @tokens = split(/\s+/, $line);
1613       next if (!@tokens);
1614       $streamname = shift(@tokens);
1615       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1616         if (defined($filename)) {
1617           return (undef, undef);  # More than one file in the Collection.
1618         } else {
1619           $filename = (split(/:/, $filedata, 3))[2];
1620         }
1621       }
1622     }
1623   }
1624   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1625     return ($streamname, $1);
1626   } else {
1627     return (undef, undef);
1628   }
1629 }
1630
1631 sub retry_count {
1632   # Calculate the number of times an operation should be retried,
1633   # assuming exponential backoff, and that we're willing to retry as
1634   # long as tasks have been running.  Enforce a minimum of 3 retries.
1635   my ($starttime, $endtime, $timediff, $retries);
1636   if (@jobstep) {
1637     $starttime = $jobstep[0]->{starttime};
1638     $endtime = $jobstep[-1]->{finishtime};
1639   }
1640   if (!defined($starttime)) {
1641     $timediff = 0;
1642   } elsif (!defined($endtime)) {
1643     $timediff = time - $starttime;
1644   } else {
1645     $timediff = ($endtime - $starttime) - (time - $endtime);
1646   }
1647   if ($timediff > 0) {
1648     $retries = int(log($timediff) / log(2));
1649   } else {
1650     $retries = 1;  # Use the minimum.
1651   }
1652   return ($retries > 3) ? $retries : 3;
1653 }
1654
1655 sub retry_op {
1656   # Pass in two function references.
1657   # This method will be called with the remaining arguments.
1658   # If it dies, retry it with exponential backoff until it succeeds,
1659   # or until the current retry_count is exhausted.  After each failure
1660   # that can be retried, the second function will be called with
1661   # the current try count (0-based) and error message.
1662   my $operation = shift;
1663   my $retry_callback = shift;
1664   my $retries = retry_count();
1665   foreach my $try_count (0..$retries) {
1666     my $next_try = time + (2 ** $try_count);
1667     my $result = eval { $operation->(@_); };
1668     if (!$@) {
1669       return $result;
1670     } elsif ($try_count < $retries) {
1671       $retry_callback->($try_count, $@);
1672       my $sleep_time = $next_try - time;
1673       sleep($sleep_time) if ($sleep_time > 0);
1674     }
1675   }
1676   # Ensure the error message ends in a newline, so Perl doesn't add
1677   # retry_op's line number to it.
1678   chomp($@);
1679   die($@ . "\n");
1680 }
1681
1682 sub api_call {
1683   # Pass in a /-separated API method name, and arguments for it.
1684   # This function will call that method, retrying as needed until
1685   # the current retry_count is exhausted, with a log on the first failure.
1686   my $method_name = shift;
1687   my $log_api_retry = sub {
1688     my ($try_count, $errmsg) = @_;
1689     if ($try_count == 0) {
1690       Log(undef, "API method $method_name failed: $errmsg. Retrying.");
1691     };
1692   };
1693   my $method = $arv;
1694   foreach my $key (split(/\//, $method_name)) {
1695     $method = $method->{$key};
1696   }
1697   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1698 }
1699
1700 sub exit_status_s {
1701   # Given a $?, return a human-readable exit code string like "0" or
1702   # "1" or "0 with signal 1" or "1 with signal 11".
1703   my $exitcode = shift;
1704   my $s = $exitcode >> 8;
1705   if ($exitcode & 0x7f) {
1706     $s .= " with signal " . ($exitcode & 0x7f);
1707   }
1708   if ($exitcode & 0x80) {
1709     $s .= " with core dump";
1710   }
1711   return $s;
1712 }
1713
1714 __DATA__
1715 #!/usr/bin/perl
1716
1717 # checkout-and-build
1718
1719 use Fcntl ':flock';
1720 use File::Path qw( make_path remove_tree );
1721
1722 my $destdir = $ENV{"CRUNCH_SRC"};
1723 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1724 my $repo = $ENV{"CRUNCH_SRC_URL"};
1725 my $task_work = $ENV{"TASK_WORK"};
1726
1727 for my $dir ($destdir, $task_work) {
1728   if ($dir) {
1729     make_path $dir;
1730     -e $dir or die "Failed to create temporary directory ($dir): $!";
1731   }
1732 }
1733
1734 if ($task_work) {
1735   remove_tree($task_work, {keep_root => 1});
1736 }
1737
1738
1739 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1740 flock L, LOCK_EX;
1741 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1742     if (@ARGV) {
1743         exec(@ARGV);
1744         die "Cannot exec `@ARGV`: $!";
1745     } else {
1746         exit 0;
1747     }
1748 }
1749
1750 unlink "$destdir.commit";
1751 open STDERR_ORIG, ">&STDERR";
1752 open STDOUT, ">", "$destdir.log";
1753 open STDERR, ">&STDOUT";
1754
1755 mkdir $destdir;
1756 my @git_archive_data = <DATA>;
1757 if (@git_archive_data) {
1758   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1759   print TARX @git_archive_data;
1760   if(!close(TARX)) {
1761     die "'tar -C $destdir -xf -' exited $?: $!";
1762   }
1763 }
1764
1765 my $pwd;
1766 chomp ($pwd = `pwd`);
1767 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1768 mkdir $install_dir;
1769
1770 for my $src_path ("$destdir/arvados/sdk/python") {
1771   if (-d $src_path) {
1772     shell_or_die ("virtualenv", $install_dir);
1773     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1774   }
1775 }
1776
1777 if (-e "$destdir/crunch_scripts/install") {
1778     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1779 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1780     # Old version
1781     shell_or_die ("./tests/autotests.sh", $install_dir);
1782 } elsif (-e "./install.sh") {
1783     shell_or_die ("./install.sh", $install_dir);
1784 }
1785
1786 if ($commit) {
1787     unlink "$destdir.commit.new";
1788     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1789     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1790 }
1791
1792 close L;
1793
1794 if (@ARGV) {
1795     exec(@ARGV);
1796     die "Cannot exec `@ARGV`: $!";
1797 } else {
1798     exit 0;
1799 }
1800
1801 sub shell_or_die
1802 {
1803   if ($ENV{"DEBUG"}) {
1804     print STDERR "@_\n";
1805   }
1806   if (system (@_) != 0) {
1807     my $err = $!;
1808     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1809     open STDERR, ">&STDERR_ORIG";
1810     system ("cat $destdir.log >&2");
1811     die "@_ failed ($err): $exitstatus";
1812   }
1813 }
1814
1815 __DATA__