4621: code review feedback
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Data::Dumper;
90 use Digest::MD5 qw(md5_hex);
91 use Getopt::Long;
92 use IPC::Open2;
93 use IO::Select;
94 use File::Temp;
95 use Fcntl ':flock';
96 use File::Path qw( make_path remove_tree );
97
98 use constant EX_TEMPFAIL => 75;
99
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103   if ($ENV{"USER"} ne "crunch" && $< != 0) {
104     # use a tmp dir unique for my uid
105     $ENV{"CRUNCH_TMP"} .= "-$<";
106   }
107 }
108
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
112 }
113
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
118
119 my $force_unlock;
120 my $git_dir;
121 my $jobspec;
122 my $job_api_token;
123 my $no_clear_tmp;
124 my $resume_stash;
125 GetOptions('force-unlock' => \$force_unlock,
126            'git-dir=s' => \$git_dir,
127            'job=s' => \$jobspec,
128            'job-api-token=s' => \$job_api_token,
129            'no-clear-tmp' => \$no_clear_tmp,
130            'resume-stash=s' => \$resume_stash,
131     );
132
133 if (defined $job_api_token) {
134   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
135 }
136
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
138 my $local_job = 0;
139
140
141 $SIG{'USR1'} = sub
142 {
143   $main::ENV{CRUNCH_DEBUG} = 1;
144 };
145 $SIG{'USR2'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 0;
148 };
149
150
151
152 my $arv = Arvados->new('apiVersion' => 'v1');
153
154 my $Job;
155 my $job_id;
156 my $dbh;
157 my $sth;
158 my @jobstep;
159
160 my $User = api_call("users/current");
161
162 if ($jobspec =~ /^[-a-z\d]+$/)
163 {
164   # $jobspec is an Arvados UUID, not a JSON job specification
165   $Job = api_call("jobs/get", uuid => $jobspec);
166   if (!$force_unlock) {
167     # Claim this job, and make sure nobody else does
168     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = api_call("jobs/create", job => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
306 my $squeue_checked;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
310
311
312
313 if (defined $Job->{thawedfromkey})
314 {
315   thaw ($Job->{thawedfromkey});
316 }
317 else
318 {
319   my $first_task = api_call("job_tasks/create", job_task => {
320     'job_uuid' => $Job->{'uuid'},
321     'sequence' => 0,
322     'qsequence' => 0,
323     'parameters' => {},
324   });
325   push @jobstep, { 'level' => 0,
326                    'failures' => 0,
327                    'arvados_task' => $first_task,
328                  };
329   push @jobstep_todo, 0;
330 }
331
332
333 if (!$have_slurm)
334 {
335   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
336 }
337
338
339 my $build_script;
340 do {
341   local $/ = undef;
342   $build_script = <DATA>;
343 };
344 my $nodelist = join(",", @node);
345
346 if (!defined $no_clear_tmp) {
347   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
348   Log (undef, "Clean work dirs");
349
350   my $cleanpid = fork();
351   if ($cleanpid == 0)
352   {
353     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
354           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
355     exit (1);
356   }
357   while (1)
358   {
359     last if $cleanpid == waitpid (-1, WNOHANG);
360     freeze_if_want_freeze ($cleanpid);
361     select (undef, undef, undef, 0.1);
362   }
363   Log (undef, "Cleanup command exited ".exit_status_s($?));
364 }
365
366
367 my $git_archive;
368 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
369   # If script_version looks like an absolute path, *and* the --git-dir
370   # argument was not given -- which implies we were not invoked by
371   # crunch-dispatch -- we will use the given path as a working
372   # directory instead of resolving script_version to a git commit (or
373   # doing anything else with git).
374   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
375   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
376 }
377 else {
378   # Resolve the given script_version to a git commit sha1. Also, if
379   # the repository is remote, clone it into our local filesystem: this
380   # ensures "git archive" will work, and is necessary to reliably
381   # resolve a symbolic script_version like "master^".
382   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
383
384   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
385
386   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
387
388   # If we're running under crunch-dispatch, it will have already
389   # pulled the appropriate source tree into its own repository, and
390   # given us that repo's path as $git_dir.
391   #
392   # If we're running a "local" job, we might have to fetch content
393   # from a remote repository.
394   #
395   # (Currently crunch-dispatch gives a local path with --git-dir, but
396   # we might as well accept URLs there too in case it changes its
397   # mind.)
398   my $repo = $git_dir || $Job->{'repository'};
399
400   # Repository can be remote or local. If remote, we'll need to fetch it
401   # to a local dir before doing `git log` et al.
402   my $repo_location;
403
404   if ($repo =~ m{://|^[^/]*:}) {
405     # $repo is a git url we can clone, like git:// or https:// or
406     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
407     # not recognized here because distinguishing that from a local
408     # path is too fragile. If you really need something strange here,
409     # use the ssh:// form.
410     $repo_location = 'remote';
411   } elsif ($repo =~ m{^\.*/}) {
412     # $repo is a local path to a git index. We'll also resolve ../foo
413     # to ../foo/.git if the latter is a directory. To help
414     # disambiguate local paths from named hosted repositories, this
415     # form must be given as ./ or ../ if it's a relative path.
416     if (-d "$repo/.git") {
417       $repo = "$repo/.git";
418     }
419     $repo_location = 'local';
420   } else {
421     # $repo is none of the above. It must be the name of a hosted
422     # repository.
423     my $arv_repo_list = api_call("repositories/list",
424                                  'filters' => [['name','=',$repo]]);
425     my @repos_found = @{$arv_repo_list->{'items'}};
426     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
427     if ($n_found > 0) {
428       Log(undef, "Repository '$repo' -> "
429           . join(", ", map { $_->{'uuid'} } @repos_found));
430     }
431     if ($n_found != 1) {
432       croak("Error: Found $n_found repositories with name '$repo'.");
433     }
434     $repo = $repos_found[0]->{'fetch_url'};
435     $repo_location = 'remote';
436   }
437   Log(undef, "Using $repo_location repository '$repo'");
438   $ENV{"CRUNCH_SRC_URL"} = $repo;
439
440   # Resolve given script_version (we'll call that $treeish here) to a
441   # commit sha1 ($commit).
442   my $treeish = $Job->{'script_version'};
443   my $commit;
444   if ($repo_location eq 'remote') {
445     # We minimize excess object-fetching by re-using the same bare
446     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
447     # just keep adding remotes to it as needed.
448     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
449     my $gitcmd = "git --git-dir=\Q$local_repo\E";
450
451     # Set up our local repo for caching remote objects, making
452     # archives, etc.
453     if (!-d $local_repo) {
454       make_path($local_repo) or croak("Error: could not create $local_repo");
455     }
456     # This works (exits 0 and doesn't delete fetched objects) even
457     # if $local_repo is already initialized:
458     `$gitcmd init --bare`;
459     if ($?) {
460       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
461     }
462
463     # If $treeish looks like a hash (or abbrev hash) we look it up in
464     # our local cache first, since that's cheaper. (We don't want to
465     # do that with tags/branches though -- those change over time, so
466     # they should always be resolved by the remote repo.)
467     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
468       # Hide stderr because it's normal for this to fail:
469       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
470       if ($? == 0 &&
471           # Careful not to resolve a branch named abcdeff to commit 1234567:
472           $sha1 =~ /^$treeish/ &&
473           $sha1 =~ /^([0-9a-f]{40})$/s) {
474         $commit = $1;
475         Log(undef, "Commit $commit already present in $local_repo");
476       }
477     }
478
479     if (!defined $commit) {
480       # If $treeish isn't just a hash or abbrev hash, or isn't here
481       # yet, we need to fetch the remote to resolve it correctly.
482
483       # First, remove all local heads. This prevents a name that does
484       # not exist on the remote from resolving to (or colliding with)
485       # a previously fetched branch or tag (possibly from a different
486       # remote).
487       remove_tree("$local_repo/refs/heads", {keep_root => 1});
488
489       Log(undef, "Fetching objects from $repo to $local_repo");
490       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
491       if ($?) {
492         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
493       }
494     }
495
496     # Now that the data is all here, we will use our local repo for
497     # the rest of our git activities.
498     $repo = $local_repo;
499   }
500
501   my $gitcmd = "git --git-dir=\Q$repo\E";
502   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
503   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
504     croak("`$gitcmd rev-list` exited "
505           .exit_status_s($?)
506           .", '$treeish' not found. Giving up.");
507   }
508   $commit = $1;
509   Log(undef, "Version $treeish is commit $commit");
510
511   if ($commit ne $Job->{'script_version'}) {
512     # Record the real commit id in the database, frozentokey, logs,
513     # etc. -- instead of an abbreviation or a branch name which can
514     # become ambiguous or point to a different commit in the future.
515     if (!$Job->update_attributes('script_version' => $commit)) {
516       croak("Error: failed to update job's script_version attribute");
517     }
518   }
519
520   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
521   $git_archive = `$gitcmd archive ''\Q$commit\E`;
522   if ($?) {
523     croak("Error: $gitcmd archive exited ".exit_status_s($?));
524   }
525 }
526
527 if (!defined $git_archive) {
528   Log(undef, "Skip install phase (no git archive)");
529   if ($have_slurm) {
530     Log(undef, "Warning: This probably means workers have no source tree!");
531   }
532 }
533 else {
534   Log(undef, "Run install script on all workers");
535
536   my @srunargs = ("srun",
537                   "--nodelist=$nodelist",
538                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
539   my @execargs = ("sh", "-c",
540                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
541
542   my $installpid = fork();
543   if ($installpid == 0)
544   {
545     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
546     exit (1);
547   }
548   while (1)
549   {
550     last if $installpid == waitpid (-1, WNOHANG);
551     freeze_if_want_freeze ($installpid);
552     select (undef, undef, undef, 0.1);
553   }
554   my $install_exited = $?;
555   Log (undef, "Install script exited ".exit_status_s($install_exited));
556   exit (1) if $install_exited != 0;
557 }
558
559 if (!$have_slurm)
560 {
561   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
562   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
563 }
564
565 # If this job requires a Docker image, install that.
566 my $docker_bin = "/usr/bin/docker.io";
567 my ($docker_locator, $docker_stream, $docker_hash);
568 if ($docker_locator = $Job->{docker_image_locator}) {
569   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
570   if (!$docker_hash)
571   {
572     croak("No Docker image hash found from locator $docker_locator");
573   }
574   $docker_stream =~ s/^\.//;
575   my $docker_install_script = qq{
576 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
577     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
578 fi
579 };
580   my $docker_pid = fork();
581   if ($docker_pid == 0)
582   {
583     srun (["srun", "--nodelist=" . join(',', @node)],
584           ["/bin/sh", "-ec", $docker_install_script]);
585     exit ($?);
586   }
587   while (1)
588   {
589     last if $docker_pid == waitpid (-1, WNOHANG);
590     freeze_if_want_freeze ($docker_pid);
591     select (undef, undef, undef, 0.1);
592   }
593   if ($? != 0)
594   {
595     croak("Installing Docker image from $docker_locator exited "
596           .exit_status_s($?));
597   }
598 }
599
600 foreach (qw (script script_version script_parameters runtime_constraints))
601 {
602   Log (undef,
603        "$_ " .
604        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
605 }
606 foreach (split (/\n/, $Job->{knobs}))
607 {
608   Log (undef, "knob " . $_);
609 }
610
611
612
613 $main::success = undef;
614
615
616
617 ONELEVEL:
618
619 my $thisround_succeeded = 0;
620 my $thisround_failed = 0;
621 my $thisround_failed_multiple = 0;
622
623 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
624                        or $a <=> $b } @jobstep_todo;
625 my $level = $jobstep[$jobstep_todo[0]]->{level};
626 Log (undef, "start level $level");
627
628
629
630 my %proc;
631 my @freeslot = (0..$#slot);
632 my @holdslot;
633 my %reader;
634 my $progress_is_dirty = 1;
635 my $progress_stats_updated = 0;
636
637 update_progress_stats();
638
639
640
641 THISROUND:
642 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
643 {
644   my $id = $jobstep_todo[$todo_ptr];
645   my $Jobstep = $jobstep[$id];
646   if ($Jobstep->{level} != $level)
647   {
648     next;
649   }
650
651   pipe $reader{$id}, "writer" or croak ($!);
652   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
653   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
654
655   my $childslot = $freeslot[0];
656   my $childnode = $slot[$childslot]->{node};
657   my $childslotname = join (".",
658                             $slot[$childslot]->{node}->{name},
659                             $slot[$childslot]->{cpu});
660   my $childpid = fork();
661   if ($childpid == 0)
662   {
663     $SIG{'INT'} = 'DEFAULT';
664     $SIG{'QUIT'} = 'DEFAULT';
665     $SIG{'TERM'} = 'DEFAULT';
666
667     foreach (values (%reader))
668     {
669       close($_);
670     }
671     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
672     open(STDOUT,">&writer");
673     open(STDERR,">&writer");
674
675     undef $dbh;
676     undef $sth;
677
678     delete $ENV{"GNUPGHOME"};
679     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
680     $ENV{"TASK_QSEQUENCE"} = $id;
681     $ENV{"TASK_SEQUENCE"} = $level;
682     $ENV{"JOB_SCRIPT"} = $Job->{script};
683     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
684       $param =~ tr/a-z/A-Z/;
685       $ENV{"JOB_PARAMETER_$param"} = $value;
686     }
687     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
688     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
689     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
690     $ENV{"HOME"} = $ENV{"TASK_WORK"};
691     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
692     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
693     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
694     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
695
696     $ENV{"GZIP"} = "-n";
697
698     my @srunargs = (
699       "srun",
700       "--nodelist=".$childnode->{name},
701       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
702       "--job-name=$job_id.$id.$$",
703         );
704     my $command =
705         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
706         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
707         ."&& cd $ENV{CRUNCH_TMP} ";
708     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
709     if ($docker_hash)
710     {
711       my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
712       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
713       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
714
715       # Dynamically configure the container to use the host system as its
716       # DNS server.  Get the host's global addresses from the ip command,
717       # and turn them into docker --dns options using gawk.
718       $command .=
719           q{$(ip -o address show scope global |
720               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
721
722       # The source tree and $destdir directory (which we have
723       # installed on the worker host) are available in the container,
724       # under the same path.
725       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
726       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
727
728       # Currently, we make arv-mount's mount point appear at /keep
729       # inside the container (instead of using the same path as the
730       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
731       # crunch scripts and utilities must not rely on this. They must
732       # use $TASK_KEEPMOUNT.
733       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
734       $ENV{TASK_KEEPMOUNT} = "/keep";
735
736       # TASK_WORK is almost exactly like a docker data volume: it
737       # starts out empty, is writable, and persists until no
738       # containers use it any more. We don't use --volumes-from to
739       # share it with other containers: it is only accessible to this
740       # task, and it goes away when this task stops.
741       #
742       # However, a docker data volume is writable only by root unless
743       # the mount point already happens to exist in the container with
744       # different permissions. Therefore, we [1] assume /tmp already
745       # exists in the image and is writable by the crunch user; [2]
746       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
747       # writable if they are created by docker while setting up the
748       # other --volumes); and [3] create $TASK_WORK inside the
749       # container using $build_script.
750       $command .= "--volume=/tmp ";
751       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
752       $ENV{"HOME"} = $ENV{"TASK_WORK"};
753       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
754
755       # TODO: Share a single JOB_WORK volume across all task
756       # containers on a given worker node, and delete it when the job
757       # ends (and, in case that doesn't work, when the next job
758       # starts).
759       #
760       # For now, use the same approach as TASK_WORK above.
761       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
762
763       while (my ($env_key, $env_val) = each %ENV)
764       {
765         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
766           $command .= "--env=\Q$env_key=$env_val\E ";
767         }
768       }
769       $command .= "--env=\QHOME=$ENV{HOME}\E ";
770       $command .= "\Q$docker_hash\E ";
771       $command .= "stdbuf --output=0 --error=0 ";
772       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
773     } else {
774       # Non-docker run
775       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
776       $command .= "stdbuf --output=0 --error=0 ";
777       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
778     }
779
780     my @execargs = ('bash', '-c', $command);
781     srun (\@srunargs, \@execargs, undef, $build_script);
782     # exec() failed, we assume nothing happened.
783     die "srun() failed on build script\n";
784   }
785   close("writer");
786   if (!defined $childpid)
787   {
788     close $reader{$id};
789     delete $reader{$id};
790     next;
791   }
792   shift @freeslot;
793   $proc{$childpid} = { jobstep => $id,
794                        time => time,
795                        slot => $childslot,
796                        jobstepname => "$job_id.$id.$childpid",
797                      };
798   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
799   $slot[$childslot]->{pid} = $childpid;
800
801   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
802   Log ($id, "child $childpid started on $childslotname");
803   $Jobstep->{starttime} = time;
804   $Jobstep->{node} = $childnode->{name};
805   $Jobstep->{slotindex} = $childslot;
806   delete $Jobstep->{stderr};
807   delete $Jobstep->{finishtime};
808
809   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
810   $Jobstep->{'arvados_task'}->save;
811
812   splice @jobstep_todo, $todo_ptr, 1;
813   --$todo_ptr;
814
815   $progress_is_dirty = 1;
816
817   while (!@freeslot
818          ||
819          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
820   {
821     last THISROUND if $main::please_freeze;
822     if ($main::please_info)
823     {
824       $main::please_info = 0;
825       freeze();
826       create_output_collection();
827       save_meta(1);
828       update_progress_stats();
829     }
830     my $gotsome
831         = readfrompipes ()
832         + reapchildren ();
833     if (!$gotsome)
834     {
835       check_refresh_wanted();
836       check_squeue();
837       update_progress_stats();
838       select (undef, undef, undef, 0.1);
839     }
840     elsif (time - $progress_stats_updated >= 30)
841     {
842       update_progress_stats();
843     }
844     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
845         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
846     {
847       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
848           .($thisround_failed+$thisround_succeeded)
849           .") -- giving up on this round";
850       Log (undef, $message);
851       last THISROUND;
852     }
853
854     # move slots from freeslot to holdslot (or back to freeslot) if necessary
855     for (my $i=$#freeslot; $i>=0; $i--) {
856       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
857         push @holdslot, (splice @freeslot, $i, 1);
858       }
859     }
860     for (my $i=$#holdslot; $i>=0; $i--) {
861       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
862         push @freeslot, (splice @holdslot, $i, 1);
863       }
864     }
865
866     # give up if no nodes are succeeding
867     if (!grep { $_->{node}->{losing_streak} == 0 &&
868                     $_->{node}->{hold_count} < 4 } @slot) {
869       my $message = "Every node has failed -- giving up on this round";
870       Log (undef, $message);
871       last THISROUND;
872     }
873   }
874 }
875
876
877 push @freeslot, splice @holdslot;
878 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
879
880
881 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
882 while (%proc)
883 {
884   if ($main::please_continue) {
885     $main::please_continue = 0;
886     goto THISROUND;
887   }
888   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
889   readfrompipes ();
890   if (!reapchildren())
891   {
892     check_refresh_wanted();
893     check_squeue();
894     update_progress_stats();
895     select (undef, undef, undef, 0.1);
896     killem (keys %proc) if $main::please_freeze;
897   }
898 }
899
900 update_progress_stats();
901 freeze_if_want_freeze();
902
903
904 if (!defined $main::success)
905 {
906   if (@jobstep_todo &&
907       $thisround_succeeded == 0 &&
908       ($thisround_failed == 0 || $thisround_failed > 4))
909   {
910     my $message = "stop because $thisround_failed tasks failed and none succeeded";
911     Log (undef, $message);
912     $main::success = 0;
913   }
914   if (!@jobstep_todo)
915   {
916     $main::success = 1;
917   }
918 }
919
920 goto ONELEVEL if !defined $main::success;
921
922
923 release_allocation();
924 freeze();
925 my $collated_output = &create_output_collection();
926
927 if (!$collated_output) {
928   Log (undef, "Failed to write output collection");
929 }
930 else {
931   Log(undef, "output hash " . $collated_output);
932   $Job->update_attributes('output' => $collated_output);
933 }
934
935 Log (undef, "finish");
936
937 save_meta();
938
939 my $final_state;
940 if ($collated_output && $main::success) {
941   $final_state = 'Complete';
942 } else {
943   $final_state = 'Failed';
944 }
945 $Job->update_attributes('state' => $final_state);
946
947 exit (($final_state eq 'Complete') ? 0 : 1);
948
949
950
951 sub update_progress_stats
952 {
953   $progress_stats_updated = time;
954   return if !$progress_is_dirty;
955   my ($todo, $done, $running) = (scalar @jobstep_todo,
956                                  scalar @jobstep_done,
957                                  scalar @slot - scalar @freeslot - scalar @holdslot);
958   $Job->{'tasks_summary'} ||= {};
959   $Job->{'tasks_summary'}->{'todo'} = $todo;
960   $Job->{'tasks_summary'}->{'done'} = $done;
961   $Job->{'tasks_summary'}->{'running'} = $running;
962   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
963   Log (undef, "status: $done done, $running running, $todo todo");
964   $progress_is_dirty = 0;
965 }
966
967
968
969 sub reapchildren
970 {
971   my $pid = waitpid (-1, WNOHANG);
972   return 0 if $pid <= 0;
973
974   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
975                   . "."
976                   . $slot[$proc{$pid}->{slot}]->{cpu});
977   my $jobstepid = $proc{$pid}->{jobstep};
978   my $elapsed = time - $proc{$pid}->{time};
979   my $Jobstep = $jobstep[$jobstepid];
980
981   my $childstatus = $?;
982   my $exitvalue = $childstatus >> 8;
983   my $exitinfo = "exit ".exit_status_s($childstatus);
984   $Jobstep->{'arvados_task'}->reload;
985   my $task_success = $Jobstep->{'arvados_task'}->{success};
986
987   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
988
989   if (!defined $task_success) {
990     # task did not indicate one way or the other --> fail
991     $Jobstep->{'arvados_task'}->{success} = 0;
992     $Jobstep->{'arvados_task'}->save;
993     $task_success = 0;
994   }
995
996   if (!$task_success)
997   {
998     my $temporary_fail;
999     $temporary_fail ||= $Jobstep->{node_fail};
1000     $temporary_fail ||= ($exitvalue == 111);
1001
1002     ++$thisround_failed;
1003     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1004
1005     # Check for signs of a failed or misconfigured node
1006     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1007         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1008       # Don't count this against jobstep failure thresholds if this
1009       # node is already suspected faulty and srun exited quickly
1010       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1011           $elapsed < 5) {
1012         Log ($jobstepid, "blaming failure on suspect node " .
1013              $slot[$proc{$pid}->{slot}]->{node}->{name});
1014         $temporary_fail ||= 1;
1015       }
1016       ban_node_by_slot($proc{$pid}->{slot});
1017     }
1018
1019     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1020                              ++$Jobstep->{'failures'},
1021                              $temporary_fail ? 'temporary ' : 'permanent',
1022                              $elapsed));
1023
1024     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1025       # Give up on this task, and the whole job
1026       $main::success = 0;
1027       $main::please_freeze = 1;
1028     }
1029     # Put this task back on the todo queue
1030     push @jobstep_todo, $jobstepid;
1031     $Job->{'tasks_summary'}->{'failed'}++;
1032   }
1033   else
1034   {
1035     ++$thisround_succeeded;
1036     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1037     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1038     push @jobstep_done, $jobstepid;
1039     Log ($jobstepid, "success in $elapsed seconds");
1040   }
1041   $Jobstep->{exitcode} = $childstatus;
1042   $Jobstep->{finishtime} = time;
1043   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1044   $Jobstep->{'arvados_task'}->save;
1045   process_stderr ($jobstepid, $task_success);
1046   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1047
1048   close $reader{$jobstepid};
1049   delete $reader{$jobstepid};
1050   delete $slot[$proc{$pid}->{slot}]->{pid};
1051   push @freeslot, $proc{$pid}->{slot};
1052   delete $proc{$pid};
1053
1054   if ($task_success) {
1055     # Load new tasks
1056     my $newtask_list = [];
1057     my $newtask_results;
1058     do {
1059       $newtask_results = api_call(
1060         "job_tasks/list",
1061         'where' => {
1062           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1063         },
1064         'order' => 'qsequence',
1065         'offset' => scalar(@$newtask_list),
1066       );
1067       push(@$newtask_list, @{$newtask_results->{items}});
1068     } while (@{$newtask_results->{items}});
1069     foreach my $arvados_task (@$newtask_list) {
1070       my $jobstep = {
1071         'level' => $arvados_task->{'sequence'},
1072         'failures' => 0,
1073         'arvados_task' => $arvados_task
1074       };
1075       push @jobstep, $jobstep;
1076       push @jobstep_todo, $#jobstep;
1077     }
1078   }
1079
1080   $progress_is_dirty = 1;
1081   1;
1082 }
1083
1084 sub check_refresh_wanted
1085 {
1086   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1087   if (@stat && $stat[9] > $latest_refresh) {
1088     $latest_refresh = scalar time;
1089     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1090     for my $attr ('cancelled_at',
1091                   'cancelled_by_user_uuid',
1092                   'cancelled_by_client_uuid',
1093                   'state') {
1094       $Job->{$attr} = $Job2->{$attr};
1095     }
1096     if ($Job->{'state'} ne "Running") {
1097       if ($Job->{'state'} eq "Cancelled") {
1098         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1099       } else {
1100         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1101       }
1102       $main::success = 0;
1103       $main::please_freeze = 1;
1104     }
1105   }
1106 }
1107
1108 sub check_squeue
1109 {
1110   # return if the kill list was checked <4 seconds ago
1111   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1112   {
1113     return;
1114   }
1115   $squeue_kill_checked = time;
1116
1117   # use killem() on procs whose killtime is reached
1118   for (keys %proc)
1119   {
1120     if (exists $proc{$_}->{killtime}
1121         && $proc{$_}->{killtime} <= time)
1122     {
1123       killem ($_);
1124     }
1125   }
1126
1127   # return if the squeue was checked <60 seconds ago
1128   if (defined $squeue_checked && $squeue_checked > time - 60)
1129   {
1130     return;
1131   }
1132   $squeue_checked = time;
1133
1134   if (!$have_slurm)
1135   {
1136     # here is an opportunity to check for mysterious problems with local procs
1137     return;
1138   }
1139
1140   # get a list of steps still running
1141   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1142   chop @squeue;
1143   if ($squeue[-1] ne "ok")
1144   {
1145     return;
1146   }
1147   pop @squeue;
1148
1149   # which of my jobsteps are running, according to squeue?
1150   my %ok;
1151   foreach (@squeue)
1152   {
1153     if (/^(\d+)\.(\d+) (\S+)/)
1154     {
1155       if ($1 eq $ENV{SLURM_JOBID})
1156       {
1157         $ok{$3} = 1;
1158       }
1159     }
1160   }
1161
1162   # which of my active child procs (>60s old) were not mentioned by squeue?
1163   foreach (keys %proc)
1164   {
1165     if ($proc{$_}->{time} < time - 60
1166         && !exists $ok{$proc{$_}->{jobstepname}}
1167         && !exists $proc{$_}->{killtime})
1168     {
1169       # kill this proc if it hasn't exited in 30 seconds
1170       $proc{$_}->{killtime} = time + 30;
1171     }
1172   }
1173 }
1174
1175
1176 sub release_allocation
1177 {
1178   if ($have_slurm)
1179   {
1180     Log (undef, "release job allocation");
1181     system "scancel $ENV{SLURM_JOBID}";
1182   }
1183 }
1184
1185
1186 sub readfrompipes
1187 {
1188   my $gotsome = 0;
1189   foreach my $job (keys %reader)
1190   {
1191     my $buf;
1192     while (0 < sysread ($reader{$job}, $buf, 8192))
1193     {
1194       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1195       $jobstep[$job]->{stderr} .= $buf;
1196       preprocess_stderr ($job);
1197       if (length ($jobstep[$job]->{stderr}) > 16384)
1198       {
1199         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1200       }
1201       $gotsome = 1;
1202     }
1203   }
1204   return $gotsome;
1205 }
1206
1207
1208 sub preprocess_stderr
1209 {
1210   my $job = shift;
1211
1212   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1213     my $line = $1;
1214     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1215     Log ($job, "stderr $line");
1216     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1217       # whoa.
1218       $main::please_freeze = 1;
1219     }
1220     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1221       $jobstep[$job]->{node_fail} = 1;
1222       ban_node_by_slot($jobstep[$job]->{slotindex});
1223     }
1224   }
1225 }
1226
1227
1228 sub process_stderr
1229 {
1230   my $job = shift;
1231   my $task_success = shift;
1232   preprocess_stderr ($job);
1233
1234   map {
1235     Log ($job, "stderr $_");
1236   } split ("\n", $jobstep[$job]->{stderr});
1237 }
1238
1239 sub fetch_block
1240 {
1241   my $hash = shift;
1242   my ($keep, $child_out, $output_block);
1243
1244   my $cmd = "arv-get \Q$hash\E";
1245   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1246   $output_block = '';
1247   while (1) {
1248     my $buf;
1249     my $bytes = sysread($keep, $buf, 1024 * 1024);
1250     if (!defined $bytes) {
1251       die "reading from arv-get: $!";
1252     } elsif ($bytes == 0) {
1253       # sysread returns 0 at the end of the pipe.
1254       last;
1255     } else {
1256       # some bytes were read into buf.
1257       $output_block .= $buf;
1258     }
1259   }
1260   close $keep;
1261   return $output_block;
1262 }
1263
1264 # create_output_collections generates a new collection containing the
1265 # output of each successfully completed task, and returns the
1266 # portable_data_hash for the new collection.
1267 #
1268 sub create_output_collection
1269 {
1270   Log (undef, "collate");
1271
1272   my ($child_out, $child_in);
1273   my $pid = open2($child_out, $child_in, 'python', '-c',
1274                   'import arvados; ' .
1275                   'import sys; ' .
1276                   'print arvados.api()' .
1277                   '.collections()' .
1278                   '.create(body={"manifest_text":sys.stdin.read()})' .
1279                   '.execute()["portable_data_hash"]'
1280       );
1281
1282   for (@jobstep)
1283   {
1284     next if (!exists $_->{'arvados_task'}->{'output'} ||
1285              !$_->{'arvados_task'}->{'success'});
1286     my $output = $_->{'arvados_task'}->{output};
1287     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1288     {
1289       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1290       print $child_in $output;
1291     }
1292     elsif (defined (my $outblock = fetch_block ($output)))
1293     {
1294       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1295       print $child_in $outblock;
1296     }
1297     else
1298     {
1299       Log (undef, "XXX fetch_block($output) failed XXX");
1300       $main::success = 0;
1301     }
1302   }
1303   $child_in->close;
1304
1305   my $joboutput;
1306   my $s = IO::Select->new($child_out);
1307   if ($s->can_read(120)) {
1308     sysread($child_out, $joboutput, 64 * 1024 * 1024);
1309     chomp($joboutput);
1310     # TODO: Ensure exit status == 0.
1311   } else {
1312     Log (undef, "timed out while creating output collection");
1313   }
1314   # TODO: kill $pid instead of waiting, now that we've decided to
1315   # ignore further output.
1316   waitpid($pid, 0);
1317
1318   return $joboutput;
1319 }
1320
1321
1322 sub killem
1323 {
1324   foreach (@_)
1325   {
1326     my $sig = 2;                # SIGINT first
1327     if (exists $proc{$_}->{"sent_$sig"} &&
1328         time - $proc{$_}->{"sent_$sig"} > 4)
1329     {
1330       $sig = 15;                # SIGTERM if SIGINT doesn't work
1331     }
1332     if (exists $proc{$_}->{"sent_$sig"} &&
1333         time - $proc{$_}->{"sent_$sig"} > 4)
1334     {
1335       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1336     }
1337     if (!exists $proc{$_}->{"sent_$sig"})
1338     {
1339       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1340       kill $sig, $_;
1341       select (undef, undef, undef, 0.1);
1342       if ($sig == 2)
1343       {
1344         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1345       }
1346       $proc{$_}->{"sent_$sig"} = time;
1347       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1348     }
1349   }
1350 }
1351
1352
1353 sub fhbits
1354 {
1355   my($bits);
1356   for (@_) {
1357     vec($bits,fileno($_),1) = 1;
1358   }
1359   $bits;
1360 }
1361
1362
1363 # Send log output to Keep via arv-put.
1364 #
1365 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1366 # $log_pipe_pid is the pid of the arv-put subprocess.
1367 #
1368 # The only functions that should access these variables directly are:
1369 #
1370 # log_writer_start($logfilename)
1371 #     Starts an arv-put pipe, reading data on stdin and writing it to
1372 #     a $logfilename file in an output collection.
1373 #
1374 # log_writer_send($txt)
1375 #     Writes $txt to the output log collection.
1376 #
1377 # log_writer_finish()
1378 #     Closes the arv-put pipe and returns the output that it produces.
1379 #
1380 # log_writer_is_active()
1381 #     Returns a true value if there is currently a live arv-put
1382 #     process, false otherwise.
1383 #
1384 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1385
1386 sub log_writer_start($)
1387 {
1388   my $logfilename = shift;
1389   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1390                         'arv-put', '--portable-data-hash',
1391                         '--retries', '3',
1392                         '--filename', $logfilename,
1393                         '-');
1394 }
1395
1396 sub log_writer_send($)
1397 {
1398   my $txt = shift;
1399   print $log_pipe_in $txt;
1400 }
1401
1402 sub log_writer_finish()
1403 {
1404   return unless $log_pipe_pid;
1405
1406   close($log_pipe_in);
1407   my $arv_put_output;
1408
1409   my $s = IO::Select->new($log_pipe_out);
1410   if ($s->can_read(120)) {
1411     sysread($log_pipe_out, $arv_put_output, 1024);
1412     chomp($arv_put_output);
1413   } else {
1414     Log (undef, "timed out reading from 'arv-put'");
1415   }
1416
1417   waitpid($log_pipe_pid, 0);
1418   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1419   if ($?) {
1420     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1421   }
1422
1423   return $arv_put_output;
1424 }
1425
1426 sub log_writer_is_active() {
1427   return $log_pipe_pid;
1428 }
1429
1430 sub Log                         # ($jobstep_id, $logmessage)
1431 {
1432   if ($_[1] =~ /\n/) {
1433     for my $line (split (/\n/, $_[1])) {
1434       Log ($_[0], $line);
1435     }
1436     return;
1437   }
1438   my $fh = select STDERR; $|=1; select $fh;
1439   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1440   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1441   $message .= "\n";
1442   my $datetime;
1443   if (log_writer_is_active() || -t STDERR) {
1444     my @gmtime = gmtime;
1445     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1446                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1447   }
1448   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1449
1450   if (log_writer_is_active()) {
1451     log_writer_send($datetime . " " . $message);
1452   }
1453 }
1454
1455
1456 sub croak
1457 {
1458   my ($package, $file, $line) = caller;
1459   my $message = "@_ at $file line $line\n";
1460   Log (undef, $message);
1461   freeze() if @jobstep_todo;
1462   create_output_collection() if @jobstep_todo;
1463   cleanup();
1464   save_meta();
1465   die;
1466 }
1467
1468
1469 sub cleanup
1470 {
1471   return unless $Job;
1472   if ($Job->{'state'} eq 'Cancelled') {
1473     $Job->update_attributes('finished_at' => scalar gmtime);
1474   } else {
1475     $Job->update_attributes('state' => 'Failed');
1476   }
1477 }
1478
1479
1480 sub save_meta
1481 {
1482   my $justcheckpoint = shift; # false if this will be the last meta saved
1483   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1484   return unless log_writer_is_active();
1485
1486   my $loglocator = log_writer_finish();
1487   Log (undef, "log manifest is $loglocator");
1488   $Job->{'log'} = $loglocator;
1489   $Job->update_attributes('log', $loglocator);
1490 }
1491
1492
1493 sub freeze_if_want_freeze
1494 {
1495   if ($main::please_freeze)
1496   {
1497     release_allocation();
1498     if (@_)
1499     {
1500       # kill some srun procs before freeze+stop
1501       map { $proc{$_} = {} } @_;
1502       while (%proc)
1503       {
1504         killem (keys %proc);
1505         select (undef, undef, undef, 0.1);
1506         my $died;
1507         while (($died = waitpid (-1, WNOHANG)) > 0)
1508         {
1509           delete $proc{$died};
1510         }
1511       }
1512     }
1513     freeze();
1514     create_output_collection();
1515     cleanup();
1516     save_meta();
1517     exit 1;
1518   }
1519 }
1520
1521
1522 sub freeze
1523 {
1524   Log (undef, "Freeze not implemented");
1525   return;
1526 }
1527
1528
1529 sub thaw
1530 {
1531   croak ("Thaw not implemented");
1532 }
1533
1534
1535 sub freezequote
1536 {
1537   my $s = shift;
1538   $s =~ s/\\/\\\\/g;
1539   $s =~ s/\n/\\n/g;
1540   return $s;
1541 }
1542
1543
1544 sub freezeunquote
1545 {
1546   my $s = shift;
1547   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1548   return $s;
1549 }
1550
1551
1552 sub srun
1553 {
1554   my $srunargs = shift;
1555   my $execargs = shift;
1556   my $opts = shift || {};
1557   my $stdin = shift;
1558   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1559
1560   $Data::Dumper::Terse = 1;
1561   $Data::Dumper::Indent = 0;
1562   my $show_cmd = Dumper($args);
1563   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1564   $show_cmd =~ s/\n/ /g;
1565   warn "starting: $show_cmd\n";
1566
1567   if (defined $stdin) {
1568     my $child = open STDIN, "-|";
1569     defined $child or die "no fork: $!";
1570     if ($child == 0) {
1571       print $stdin or die $!;
1572       close STDOUT or die $!;
1573       exit 0;
1574     }
1575   }
1576
1577   return system (@$args) if $opts->{fork};
1578
1579   exec @$args;
1580   warn "ENV size is ".length(join(" ",%ENV));
1581   die "exec failed: $!: @$args";
1582 }
1583
1584
1585 sub ban_node_by_slot {
1586   # Don't start any new jobsteps on this node for 60 seconds
1587   my $slotid = shift;
1588   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1589   $slot[$slotid]->{node}->{hold_count}++;
1590   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1591 }
1592
1593 sub must_lock_now
1594 {
1595   my ($lockfile, $error_message) = @_;
1596   open L, ">", $lockfile or croak("$lockfile: $!");
1597   if (!flock L, LOCK_EX|LOCK_NB) {
1598     croak("Can't lock $lockfile: $error_message\n");
1599   }
1600 }
1601
1602 sub find_docker_image {
1603   # Given a Keep locator, check to see if it contains a Docker image.
1604   # If so, return its stream name and Docker hash.
1605   # If not, return undef for both values.
1606   my $locator = shift;
1607   my ($streamname, $filename);
1608   my $image = api_call("collections/get", uuid => $locator);
1609   if ($image) {
1610     foreach my $line (split(/\n/, $image->{manifest_text})) {
1611       my @tokens = split(/\s+/, $line);
1612       next if (!@tokens);
1613       $streamname = shift(@tokens);
1614       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1615         if (defined($filename)) {
1616           return (undef, undef);  # More than one file in the Collection.
1617         } else {
1618           $filename = (split(/:/, $filedata, 3))[2];
1619         }
1620       }
1621     }
1622   }
1623   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1624     return ($streamname, $1);
1625   } else {
1626     return (undef, undef);
1627   }
1628 }
1629
1630 sub retry_count {
1631   # Calculate the number of times an operation should be retried,
1632   # assuming exponential backoff, and that we're willing to retry as
1633   # long as tasks have been running.  Enforce a minimum of 3 retries.
1634   my ($starttime, $endtime, $timediff, $retries);
1635   if (@jobstep) {
1636     $starttime = $jobstep[0]->{starttime};
1637     $endtime = $jobstep[-1]->{finishtime};
1638   }
1639   if (!defined($starttime)) {
1640     $timediff = 0;
1641   } elsif (!defined($endtime)) {
1642     $timediff = time - $starttime;
1643   } else {
1644     $timediff = ($endtime - $starttime) - (time - $endtime);
1645   }
1646   if ($timediff > 0) {
1647     $retries = int(log($timediff) / log(2));
1648   } else {
1649     $retries = 1;  # Use the minimum.
1650   }
1651   return ($retries > 3) ? $retries : 3;
1652 }
1653
1654 sub retry_op {
1655   # Pass in two function references.
1656   # This method will be called with the remaining arguments.
1657   # If it dies, retry it with exponential backoff until it succeeds,
1658   # or until the current retry_count is exhausted.  After each failure
1659   # that can be retried, the second function will be called with
1660   # the current try count (0-based), next try time, and error message.
1661   my $operation = shift;
1662   my $retry_callback = shift;
1663   my $retries = retry_count();
1664   foreach my $try_count (0..$retries) {
1665     my $next_try = time + (2 ** $try_count);
1666     my $result = eval { $operation->(@_); };
1667     if (!$@) {
1668       return $result;
1669     } elsif ($try_count < $retries) {
1670       $retry_callback->($try_count, $next_try, $@);
1671       my $sleep_time = $next_try - time;
1672       sleep($sleep_time) if ($sleep_time > 0);
1673     }
1674   }
1675   # Ensure the error message ends in a newline, so Perl doesn't add
1676   # retry_op's line number to it.
1677   chomp($@);
1678   die($@ . "\n");
1679 }
1680
1681 sub api_call {
1682   # Pass in a /-separated API method name, and arguments for it.
1683   # This function will call that method, retrying as needed until
1684   # the current retry_count is exhausted, with a log on the first failure.
1685   my $method_name = shift;
1686   my $log_api_retry = sub {
1687     my ($try_count, $next_try_at, $errmsg) = @_;
1688     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1689     $errmsg =~ s/\s/ /g;
1690     $errmsg =~ s/\s+$//;
1691     my $retry_msg;
1692     if ($next_try_at < time) {
1693       $retry_msg = "Retrying.";
1694     } else {
1695       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1696       $retry_msg = "Retrying at $next_try_fmt.";
1697     }
1698     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1699   };
1700   my $method = $arv;
1701   foreach my $key (split(/\//, $method_name)) {
1702     $method = $method->{$key};
1703   }
1704   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1705 }
1706
1707 sub exit_status_s {
1708   # Given a $?, return a human-readable exit code string like "0" or
1709   # "1" or "0 with signal 1" or "1 with signal 11".
1710   my $exitcode = shift;
1711   my $s = $exitcode >> 8;
1712   if ($exitcode & 0x7f) {
1713     $s .= " with signal " . ($exitcode & 0x7f);
1714   }
1715   if ($exitcode & 0x80) {
1716     $s .= " with core dump";
1717   }
1718   return $s;
1719 }
1720
1721 __DATA__
1722 #!/usr/bin/perl
1723
1724 # checkout-and-build
1725
1726 use Fcntl ':flock';
1727 use File::Path qw( make_path remove_tree );
1728
1729 my $destdir = $ENV{"CRUNCH_SRC"};
1730 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1731 my $repo = $ENV{"CRUNCH_SRC_URL"};
1732 my $job_work = $ENV{"JOB_WORK"};
1733 my $task_work = $ENV{"TASK_WORK"};
1734
1735 for my $dir ($destdir, $job_work, $task_work) {
1736   if ($dir) {
1737     make_path $dir;
1738     -e $dir or die "Failed to create temporary directory ($dir): $!";
1739   }
1740 }
1741
1742 if ($task_work) {
1743   remove_tree($task_work, {keep_root => 1});
1744 }
1745
1746 my @git_archive_data = <DATA>;
1747 if (!@git_archive_data) {
1748   # Nothing to extract -> nothing to install.
1749   run_argv_and_exit();
1750 }
1751
1752 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1753 flock L, LOCK_EX;
1754 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1755   # This version already installed -> nothing to do.
1756   run_argv_and_exit();
1757 }
1758
1759 unlink "$destdir.commit";
1760 open STDERR_ORIG, ">&STDERR";
1761 open STDOUT, ">", "$destdir.log";
1762 open STDERR, ">&STDOUT";
1763
1764 mkdir $destdir;
1765 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1766 print TARX @git_archive_data;
1767 if(!close(TARX)) {
1768   die "'tar -C $destdir -xf -' exited $?: $!";
1769 }
1770
1771 my $pwd;
1772 chomp ($pwd = `pwd`);
1773 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1774 mkdir $install_dir;
1775
1776 for my $src_path ("$destdir/arvados/sdk/python") {
1777   if (-d $src_path) {
1778     shell_or_die ("virtualenv", $install_dir);
1779     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1780   }
1781 }
1782
1783 if (-e "$destdir/crunch_scripts/install") {
1784     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1785 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1786     # Old version
1787     shell_or_die ("./tests/autotests.sh", $install_dir);
1788 } elsif (-e "./install.sh") {
1789     shell_or_die ("./install.sh", $install_dir);
1790 }
1791
1792 if ($commit) {
1793     unlink "$destdir.commit.new";
1794     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1795     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1796 }
1797
1798 close L;
1799
1800 run_argv_and_exit();
1801
1802 sub run_argv_and_exit
1803 {
1804   if (@ARGV) {
1805     exec(@ARGV);
1806     die "Cannot exec `@ARGV`: $!";
1807   } else {
1808     exit 0;
1809   }
1810 }
1811
1812 sub shell_or_die
1813 {
1814   if ($ENV{"DEBUG"}) {
1815     print STDERR "@_\n";
1816   }
1817   if (system (@_) != 0) {
1818     my $err = $!;
1819     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1820     open STDERR, ">&STDERR_ORIG";
1821     system ("cat $destdir.log >&2");
1822     die "@_ failed ($err): $exitstatus";
1823   }
1824 }
1825
1826 __DATA__