4027: Crunch installs jobs' requested arvados_sdk_version.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Data::Dumper;
90 use Digest::MD5 qw(md5_hex);
91 use Getopt::Long;
92 use IPC::Open2;
93 use IO::Select;
94 use File::Temp;
95 use Fcntl ':flock';
96 use File::Path qw( make_path remove_tree );
97
98 use constant EX_TEMPFAIL => 75;
99
100 $ENV{"TMPDIR"} ||= "/tmp";
101 unless (defined $ENV{"CRUNCH_TMP"}) {
102   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
103   if ($ENV{"USER"} ne "crunch" && $< != 0) {
104     # use a tmp dir unique for my uid
105     $ENV{"CRUNCH_TMP"} .= "-$<";
106   }
107 }
108
109 # Create the tmp directory if it does not exist
110 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
111   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
112 }
113
114 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
115 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
116 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
117 mkdir ($ENV{"JOB_WORK"});
118
119 my $force_unlock;
120 my $git_dir;
121 my $jobspec;
122 my $job_api_token;
123 my $no_clear_tmp;
124 my $resume_stash;
125 GetOptions('force-unlock' => \$force_unlock,
126            'git-dir=s' => \$git_dir,
127            'job=s' => \$jobspec,
128            'job-api-token=s' => \$job_api_token,
129            'no-clear-tmp' => \$no_clear_tmp,
130            'resume-stash=s' => \$resume_stash,
131     );
132
133 if (defined $job_api_token) {
134   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
135 }
136
137 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
138 my $local_job = 0;
139
140
141 $SIG{'USR1'} = sub
142 {
143   $main::ENV{CRUNCH_DEBUG} = 1;
144 };
145 $SIG{'USR2'} = sub
146 {
147   $main::ENV{CRUNCH_DEBUG} = 0;
148 };
149
150
151
152 my $arv = Arvados->new('apiVersion' => 'v1');
153
154 my $Job;
155 my $job_id;
156 my $dbh;
157 my $sth;
158 my @jobstep;
159
160 my $User = api_call("users/current");
161
162 if ($jobspec =~ /^[-a-z\d]+$/)
163 {
164   # $jobspec is an Arvados UUID, not a JSON job specification
165   $Job = api_call("jobs/get", uuid => $jobspec);
166   if (!$force_unlock) {
167     # Claim this job, and make sure nobody else does
168     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
169     if ($@) {
170       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
171       exit EX_TEMPFAIL;
172     };
173   }
174 }
175 else
176 {
177   $Job = JSON::decode_json($jobspec);
178
179   if (!$resume_stash)
180   {
181     map { croak ("No $_ specified") unless $Job->{$_} }
182     qw(script script_version script_parameters);
183   }
184
185   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
186   $Job->{'started_at'} = gmtime;
187   $Job->{'state'} = 'Running';
188
189   $Job = api_call("jobs/create", job => $Job);
190 }
191 $job_id = $Job->{'uuid'};
192
193 my $keep_logfile = $job_id . '.log.txt';
194 log_writer_start($keep_logfile);
195
196 $Job->{'runtime_constraints'} ||= {};
197 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
198 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
199
200
201 Log (undef, "check slurm allocation");
202 my @slot;
203 my @node;
204 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
205 my @sinfo;
206 if (!$have_slurm)
207 {
208   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
209   push @sinfo, "$localcpus localhost";
210 }
211 if (exists $ENV{SLURM_NODELIST})
212 {
213   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
214 }
215 foreach (@sinfo)
216 {
217   my ($ncpus, $slurm_nodelist) = split;
218   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
219
220   my @nodelist;
221   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
222   {
223     my $nodelist = $1;
224     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
225     {
226       my $ranges = $1;
227       foreach (split (",", $ranges))
228       {
229         my ($a, $b);
230         if (/(\d+)-(\d+)/)
231         {
232           $a = $1;
233           $b = $2;
234         }
235         else
236         {
237           $a = $_;
238           $b = $_;
239         }
240         push @nodelist, map {
241           my $n = $nodelist;
242           $n =~ s/\[[-,\d]+\]/$_/;
243           $n;
244         } ($a..$b);
245       }
246     }
247     else
248     {
249       push @nodelist, $nodelist;
250     }
251   }
252   foreach my $nodename (@nodelist)
253   {
254     Log (undef, "node $nodename - $ncpus slots");
255     my $node = { name => $nodename,
256                  ncpus => $ncpus,
257                  losing_streak => 0,
258                  hold_until => 0 };
259     foreach my $cpu (1..$ncpus)
260     {
261       push @slot, { node => $node,
262                     cpu => $cpu };
263     }
264   }
265   push @node, @nodelist;
266 }
267
268
269
270 # Ensure that we get one jobstep running on each allocated node before
271 # we start overloading nodes with concurrent steps
272
273 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274
275
276 $Job->update_attributes(
277   'tasks_summary' => { 'failed' => 0,
278                        'todo' => 1,
279                        'running' => 0,
280                        'done' => 0 });
281
282 Log (undef, "start");
283 $SIG{'INT'} = sub { $main::please_freeze = 1; };
284 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
285 $SIG{'TERM'} = \&croak;
286 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
287 $SIG{'ALRM'} = sub { $main::please_info = 1; };
288 $SIG{'CONT'} = sub { $main::please_continue = 1; };
289 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
290
291 $main::please_freeze = 0;
292 $main::please_info = 0;
293 $main::please_continue = 0;
294 $main::please_refresh = 0;
295 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
296
297 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
298 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
299 $ENV{"JOB_UUID"} = $job_id;
300
301
302 my @jobstep_todo = ();
303 my @jobstep_done = ();
304 my @jobstep_tomerge = ();
305 my $jobstep_tomerge_level = 0;
306 my $squeue_checked;
307 my $squeue_kill_checked;
308 my $output_in_keep = 0;
309 my $latest_refresh = scalar time;
310
311
312
313 if (defined $Job->{thawedfromkey})
314 {
315   thaw ($Job->{thawedfromkey});
316 }
317 else
318 {
319   my $first_task = api_call("job_tasks/create", job_task => {
320     'job_uuid' => $Job->{'uuid'},
321     'sequence' => 0,
322     'qsequence' => 0,
323     'parameters' => {},
324   });
325   push @jobstep, { 'level' => 0,
326                    'failures' => 0,
327                    'arvados_task' => $first_task,
328                  };
329   push @jobstep_todo, 0;
330 }
331
332
333 if (!$have_slurm)
334 {
335   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
336 }
337
338 my $build_script = handle_readall(\*DATA);
339 my $nodelist = join(",", @node);
340 my $git_tar_count = 0;
341
342 if (!defined $no_clear_tmp) {
343   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
344   Log (undef, "Clean work dirs");
345
346   my $cleanpid = fork();
347   if ($cleanpid == 0)
348   {
349     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
351     exit (1);
352   }
353   while (1)
354   {
355     last if $cleanpid == waitpid (-1, WNOHANG);
356     freeze_if_want_freeze ($cleanpid);
357     select (undef, undef, undef, 0.1);
358   }
359   Log (undef, "Cleanup command exited ".exit_status_s($?));
360 }
361
362 # If this job requires a Docker image, install that.
363 my $docker_bin = "/usr/bin/docker.io";
364 my ($docker_locator, $docker_stream, $docker_hash);
365 if ($docker_locator = $Job->{docker_image_locator}) {
366   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
367   if (!$docker_hash)
368   {
369     croak("No Docker image hash found from locator $docker_locator");
370   }
371   $docker_stream =~ s/^\.//;
372   my $docker_install_script = qq{
373 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
374     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
375 fi
376 };
377   my $docker_pid = fork();
378   if ($docker_pid == 0)
379   {
380     srun (["srun", "--nodelist=" . join(',', @node)],
381           ["/bin/sh", "-ec", $docker_install_script]);
382     exit ($?);
383   }
384   while (1)
385   {
386     last if $docker_pid == waitpid (-1, WNOHANG);
387     freeze_if_want_freeze ($docker_pid);
388     select (undef, undef, undef, 0.1);
389   }
390   if ($? != 0)
391   {
392     croak("Installing Docker image from $docker_locator exited "
393           .exit_status_s($?));
394   }
395
396   if ($Job->{arvados_sdk_version}) {
397     # The job also specifies an Arvados SDK version.  Add the SDKs to the
398     # tar file for the build script to install.
399     add_git_archive("git", "--git-dir=$git_dir", "archive",
400                     "--prefix=.arvados.sdk/",
401                     $Job->{arvados_sdk_version}, "sdk");
402   }
403 }
404
405 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
406   # If script_version looks like an absolute path, *and* the --git-dir
407   # argument was not given -- which implies we were not invoked by
408   # crunch-dispatch -- we will use the given path as a working
409   # directory instead of resolving script_version to a git commit (or
410   # doing anything else with git).
411   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
412   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
413 }
414 else {
415   # Resolve the given script_version to a git commit sha1. Also, if
416   # the repository is remote, clone it into our local filesystem: this
417   # ensures "git archive" will work, and is necessary to reliably
418   # resolve a symbolic script_version like "master^".
419   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
420
421   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
422
423   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
424
425   # If we're running under crunch-dispatch, it will have already
426   # pulled the appropriate source tree into its own repository, and
427   # given us that repo's path as $git_dir.
428   #
429   # If we're running a "local" job, we might have to fetch content
430   # from a remote repository.
431   #
432   # (Currently crunch-dispatch gives a local path with --git-dir, but
433   # we might as well accept URLs there too in case it changes its
434   # mind.)
435   my $repo = $git_dir || $Job->{'repository'};
436
437   # Repository can be remote or local. If remote, we'll need to fetch it
438   # to a local dir before doing `git log` et al.
439   my $repo_location;
440
441   if ($repo =~ m{://|^[^/]*:}) {
442     # $repo is a git url we can clone, like git:// or https:// or
443     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
444     # not recognized here because distinguishing that from a local
445     # path is too fragile. If you really need something strange here,
446     # use the ssh:// form.
447     $repo_location = 'remote';
448   } elsif ($repo =~ m{^\.*/}) {
449     # $repo is a local path to a git index. We'll also resolve ../foo
450     # to ../foo/.git if the latter is a directory. To help
451     # disambiguate local paths from named hosted repositories, this
452     # form must be given as ./ or ../ if it's a relative path.
453     if (-d "$repo/.git") {
454       $repo = "$repo/.git";
455     }
456     $repo_location = 'local';
457   } else {
458     # $repo is none of the above. It must be the name of a hosted
459     # repository.
460     my $arv_repo_list = api_call("repositories/list",
461                                  'filters' => [['name','=',$repo]]);
462     my @repos_found = @{$arv_repo_list->{'items'}};
463     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
464     if ($n_found > 0) {
465       Log(undef, "Repository '$repo' -> "
466           . join(", ", map { $_->{'uuid'} } @repos_found));
467     }
468     if ($n_found != 1) {
469       croak("Error: Found $n_found repositories with name '$repo'.");
470     }
471     $repo = $repos_found[0]->{'fetch_url'};
472     $repo_location = 'remote';
473   }
474   Log(undef, "Using $repo_location repository '$repo'");
475   $ENV{"CRUNCH_SRC_URL"} = $repo;
476
477   # Resolve given script_version (we'll call that $treeish here) to a
478   # commit sha1 ($commit).
479   my $treeish = $Job->{'script_version'};
480   my $commit;
481   if ($repo_location eq 'remote') {
482     # We minimize excess object-fetching by re-using the same bare
483     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
484     # just keep adding remotes to it as needed.
485     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
486     my $gitcmd = "git --git-dir=\Q$local_repo\E";
487
488     # Set up our local repo for caching remote objects, making
489     # archives, etc.
490     if (!-d $local_repo) {
491       make_path($local_repo) or croak("Error: could not create $local_repo");
492     }
493     # This works (exits 0 and doesn't delete fetched objects) even
494     # if $local_repo is already initialized:
495     `$gitcmd init --bare`;
496     if ($?) {
497       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
498     }
499
500     # If $treeish looks like a hash (or abbrev hash) we look it up in
501     # our local cache first, since that's cheaper. (We don't want to
502     # do that with tags/branches though -- those change over time, so
503     # they should always be resolved by the remote repo.)
504     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
505       # Hide stderr because it's normal for this to fail:
506       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
507       if ($? == 0 &&
508           # Careful not to resolve a branch named abcdeff to commit 1234567:
509           $sha1 =~ /^$treeish/ &&
510           $sha1 =~ /^([0-9a-f]{40})$/s) {
511         $commit = $1;
512         Log(undef, "Commit $commit already present in $local_repo");
513       }
514     }
515
516     if (!defined $commit) {
517       # If $treeish isn't just a hash or abbrev hash, or isn't here
518       # yet, we need to fetch the remote to resolve it correctly.
519
520       # First, remove all local heads. This prevents a name that does
521       # not exist on the remote from resolving to (or colliding with)
522       # a previously fetched branch or tag (possibly from a different
523       # remote).
524       remove_tree("$local_repo/refs/heads", {keep_root => 1});
525
526       Log(undef, "Fetching objects from $repo to $local_repo");
527       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
528       if ($?) {
529         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
530       }
531     }
532
533     # Now that the data is all here, we will use our local repo for
534     # the rest of our git activities.
535     $repo = $local_repo;
536   }
537
538   my $gitcmd = "git --git-dir=\Q$repo\E";
539   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
540   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
541     croak("`$gitcmd rev-list` exited "
542           .exit_status_s($?)
543           .", '$treeish' not found. Giving up.");
544   }
545   $commit = $1;
546   Log(undef, "Version $treeish is commit $commit");
547
548   if ($commit ne $Job->{'script_version'}) {
549     # Record the real commit id in the database, frozentokey, logs,
550     # etc. -- instead of an abbreviation or a branch name which can
551     # become ambiguous or point to a different commit in the future.
552     if (!$Job->update_attributes('script_version' => $commit)) {
553       croak("Error: failed to update job's script_version attribute");
554     }
555   }
556
557   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
558   add_git_archive("$gitcmd archive ''\Q$commit\E");
559 }
560
561 my $git_archive = combined_git_archive();
562 if (!defined $git_archive) {
563   Log(undef, "Skip install phase (no git archive)");
564   if ($have_slurm) {
565     Log(undef, "Warning: This probably means workers have no source tree!");
566   }
567 }
568 else {
569   Log(undef, "Run install script on all workers");
570
571   my @srunargs = ("srun",
572                   "--nodelist=$nodelist",
573                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
574   my @execargs = ("sh", "-c",
575                   "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
576
577   my $installpid = fork();
578   if ($installpid == 0)
579   {
580     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
581     exit (1);
582   }
583   while (1)
584   {
585     last if $installpid == waitpid (-1, WNOHANG);
586     freeze_if_want_freeze ($installpid);
587     select (undef, undef, undef, 0.1);
588   }
589   my $install_exited = $?;
590   Log (undef, "Install script exited ".exit_status_s($install_exited));
591   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
592     unlink($tar_filename);
593   }
594   exit (1) if $install_exited != 0;
595 }
596
597 foreach (qw (script script_version script_parameters runtime_constraints))
598 {
599   Log (undef,
600        "$_ " .
601        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
602 }
603 foreach (split (/\n/, $Job->{knobs}))
604 {
605   Log (undef, "knob " . $_);
606 }
607
608
609
610 $main::success = undef;
611
612
613
614 ONELEVEL:
615
616 my $thisround_succeeded = 0;
617 my $thisround_failed = 0;
618 my $thisround_failed_multiple = 0;
619
620 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
621                        or $a <=> $b } @jobstep_todo;
622 my $level = $jobstep[$jobstep_todo[0]]->{level};
623 Log (undef, "start level $level");
624
625
626
627 my %proc;
628 my @freeslot = (0..$#slot);
629 my @holdslot;
630 my %reader;
631 my $progress_is_dirty = 1;
632 my $progress_stats_updated = 0;
633
634 update_progress_stats();
635
636
637
638 THISROUND:
639 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
640 {
641   my $id = $jobstep_todo[$todo_ptr];
642   my $Jobstep = $jobstep[$id];
643   if ($Jobstep->{level} != $level)
644   {
645     next;
646   }
647
648   pipe $reader{$id}, "writer" or croak ($!);
649   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
650   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
651
652   my $childslot = $freeslot[0];
653   my $childnode = $slot[$childslot]->{node};
654   my $childslotname = join (".",
655                             $slot[$childslot]->{node}->{name},
656                             $slot[$childslot]->{cpu});
657   my $childpid = fork();
658   if ($childpid == 0)
659   {
660     $SIG{'INT'} = 'DEFAULT';
661     $SIG{'QUIT'} = 'DEFAULT';
662     $SIG{'TERM'} = 'DEFAULT';
663
664     foreach (values (%reader))
665     {
666       close($_);
667     }
668     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
669     open(STDOUT,">&writer");
670     open(STDERR,">&writer");
671
672     undef $dbh;
673     undef $sth;
674
675     delete $ENV{"GNUPGHOME"};
676     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
677     $ENV{"TASK_QSEQUENCE"} = $id;
678     $ENV{"TASK_SEQUENCE"} = $level;
679     $ENV{"JOB_SCRIPT"} = $Job->{script};
680     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
681       $param =~ tr/a-z/A-Z/;
682       $ENV{"JOB_PARAMETER_$param"} = $value;
683     }
684     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
685     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
686     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
687     $ENV{"HOME"} = $ENV{"TASK_WORK"};
688     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
689     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
690     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
691     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
692
693     $ENV{"GZIP"} = "-n";
694
695     my @srunargs = (
696       "srun",
697       "--nodelist=".$childnode->{name},
698       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
699       "--job-name=$job_id.$id.$$",
700         );
701     my $command =
702         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
703         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
704         ."&& cd $ENV{CRUNCH_TMP} ";
705     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
706     if ($docker_hash)
707     {
708       my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
709       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
710       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
711
712       # Dynamically configure the container to use the host system as its
713       # DNS server.  Get the host's global addresses from the ip command,
714       # and turn them into docker --dns options using gawk.
715       $command .=
716           q{$(ip -o address show scope global |
717               gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
718
719       # The source tree and $destdir directory (which we have
720       # installed on the worker host) are available in the container,
721       # under the same path.
722       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
723       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
724
725       # Currently, we make arv-mount's mount point appear at /keep
726       # inside the container (instead of using the same path as the
727       # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
728       # crunch scripts and utilities must not rely on this. They must
729       # use $TASK_KEEPMOUNT.
730       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
731       $ENV{TASK_KEEPMOUNT} = "/keep";
732
733       # TASK_WORK is almost exactly like a docker data volume: it
734       # starts out empty, is writable, and persists until no
735       # containers use it any more. We don't use --volumes-from to
736       # share it with other containers: it is only accessible to this
737       # task, and it goes away when this task stops.
738       #
739       # However, a docker data volume is writable only by root unless
740       # the mount point already happens to exist in the container with
741       # different permissions. Therefore, we [1] assume /tmp already
742       # exists in the image and is writable by the crunch user; [2]
743       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
744       # writable if they are created by docker while setting up the
745       # other --volumes); and [3] create $TASK_WORK inside the
746       # container using $build_script.
747       $command .= "--volume=/tmp ";
748       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
749       $ENV{"HOME"} = $ENV{"TASK_WORK"};
750       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
751
752       # TODO: Share a single JOB_WORK volume across all task
753       # containers on a given worker node, and delete it when the job
754       # ends (and, in case that doesn't work, when the next job
755       # starts).
756       #
757       # For now, use the same approach as TASK_WORK above.
758       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
759
760       while (my ($env_key, $env_val) = each %ENV)
761       {
762         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
763           $command .= "--env=\Q$env_key=$env_val\E ";
764         }
765       }
766       $command .= "--env=\QHOME=$ENV{HOME}\E ";
767       $command .= "\Q$docker_hash\E ";
768       $command .= "stdbuf --output=0 --error=0 ";
769       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
770     } else {
771       # Non-docker run
772       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
773       $command .= "stdbuf --output=0 --error=0 ";
774       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
775     }
776
777     my @execargs = ('bash', '-c', $command);
778     srun (\@srunargs, \@execargs, undef, $build_script);
779     # exec() failed, we assume nothing happened.
780     die "srun() failed on build script\n";
781   }
782   close("writer");
783   if (!defined $childpid)
784   {
785     close $reader{$id};
786     delete $reader{$id};
787     next;
788   }
789   shift @freeslot;
790   $proc{$childpid} = { jobstep => $id,
791                        time => time,
792                        slot => $childslot,
793                        jobstepname => "$job_id.$id.$childpid",
794                      };
795   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
796   $slot[$childslot]->{pid} = $childpid;
797
798   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
799   Log ($id, "child $childpid started on $childslotname");
800   $Jobstep->{starttime} = time;
801   $Jobstep->{node} = $childnode->{name};
802   $Jobstep->{slotindex} = $childslot;
803   delete $Jobstep->{stderr};
804   delete $Jobstep->{finishtime};
805
806   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
807   $Jobstep->{'arvados_task'}->save;
808
809   splice @jobstep_todo, $todo_ptr, 1;
810   --$todo_ptr;
811
812   $progress_is_dirty = 1;
813
814   while (!@freeslot
815          ||
816          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
817   {
818     last THISROUND if $main::please_freeze;
819     if ($main::please_info)
820     {
821       $main::please_info = 0;
822       freeze();
823       collate_output();
824       save_meta(1);
825       update_progress_stats();
826     }
827     my $gotsome
828         = readfrompipes ()
829         + reapchildren ();
830     if (!$gotsome)
831     {
832       check_refresh_wanted();
833       check_squeue();
834       update_progress_stats();
835       select (undef, undef, undef, 0.1);
836     }
837     elsif (time - $progress_stats_updated >= 30)
838     {
839       update_progress_stats();
840     }
841     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
842         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
843     {
844       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
845           .($thisround_failed+$thisround_succeeded)
846           .") -- giving up on this round";
847       Log (undef, $message);
848       last THISROUND;
849     }
850
851     # move slots from freeslot to holdslot (or back to freeslot) if necessary
852     for (my $i=$#freeslot; $i>=0; $i--) {
853       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
854         push @holdslot, (splice @freeslot, $i, 1);
855       }
856     }
857     for (my $i=$#holdslot; $i>=0; $i--) {
858       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
859         push @freeslot, (splice @holdslot, $i, 1);
860       }
861     }
862
863     # give up if no nodes are succeeding
864     if (!grep { $_->{node}->{losing_streak} == 0 &&
865                     $_->{node}->{hold_count} < 4 } @slot) {
866       my $message = "Every node has failed -- giving up on this round";
867       Log (undef, $message);
868       last THISROUND;
869     }
870   }
871 }
872
873
874 push @freeslot, splice @holdslot;
875 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
876
877
878 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
879 while (%proc)
880 {
881   if ($main::please_continue) {
882     $main::please_continue = 0;
883     goto THISROUND;
884   }
885   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
886   readfrompipes ();
887   if (!reapchildren())
888   {
889     check_refresh_wanted();
890     check_squeue();
891     update_progress_stats();
892     select (undef, undef, undef, 0.1);
893     killem (keys %proc) if $main::please_freeze;
894   }
895 }
896
897 update_progress_stats();
898 freeze_if_want_freeze();
899
900
901 if (!defined $main::success)
902 {
903   if (@jobstep_todo &&
904       $thisround_succeeded == 0 &&
905       ($thisround_failed == 0 || $thisround_failed > 4))
906   {
907     my $message = "stop because $thisround_failed tasks failed and none succeeded";
908     Log (undef, $message);
909     $main::success = 0;
910   }
911   if (!@jobstep_todo)
912   {
913     $main::success = 1;
914   }
915 }
916
917 goto ONELEVEL if !defined $main::success;
918
919
920 release_allocation();
921 freeze();
922 my $collated_output = &collate_output();
923
924 if (!$collated_output) {
925   Log(undef, "output undef");
926 }
927 else {
928   eval {
929     open(my $orig_manifest, '-|', 'arv-get', $collated_output)
930         or die "failed to get collated manifest: $!";
931     my $orig_manifest_text = '';
932     while (my $manifest_line = <$orig_manifest>) {
933       $orig_manifest_text .= $manifest_line;
934     }
935     my $output = api_call("collections/create", collection => {
936       'manifest_text' => $orig_manifest_text});
937     Log(undef, "output uuid " . $output->{uuid});
938     Log(undef, "output hash " . $output->{portable_data_hash});
939     $Job->update_attributes('output' => $output->{portable_data_hash});
940   };
941   if ($@) {
942     Log (undef, "Failed to register output manifest: $@");
943   }
944 }
945
946 Log (undef, "finish");
947
948 save_meta();
949
950 my $final_state;
951 if ($collated_output && $main::success) {
952   $final_state = 'Complete';
953 } else {
954   $final_state = 'Failed';
955 }
956 $Job->update_attributes('state' => $final_state);
957
958 exit (($final_state eq 'Complete') ? 0 : 1);
959
960
961
962 sub update_progress_stats
963 {
964   $progress_stats_updated = time;
965   return if !$progress_is_dirty;
966   my ($todo, $done, $running) = (scalar @jobstep_todo,
967                                  scalar @jobstep_done,
968                                  scalar @slot - scalar @freeslot - scalar @holdslot);
969   $Job->{'tasks_summary'} ||= {};
970   $Job->{'tasks_summary'}->{'todo'} = $todo;
971   $Job->{'tasks_summary'}->{'done'} = $done;
972   $Job->{'tasks_summary'}->{'running'} = $running;
973   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
974   Log (undef, "status: $done done, $running running, $todo todo");
975   $progress_is_dirty = 0;
976 }
977
978
979
980 sub reapchildren
981 {
982   my $pid = waitpid (-1, WNOHANG);
983   return 0 if $pid <= 0;
984
985   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
986                   . "."
987                   . $slot[$proc{$pid}->{slot}]->{cpu});
988   my $jobstepid = $proc{$pid}->{jobstep};
989   my $elapsed = time - $proc{$pid}->{time};
990   my $Jobstep = $jobstep[$jobstepid];
991
992   my $childstatus = $?;
993   my $exitvalue = $childstatus >> 8;
994   my $exitinfo = "exit ".exit_status_s($childstatus);
995   $Jobstep->{'arvados_task'}->reload;
996   my $task_success = $Jobstep->{'arvados_task'}->{success};
997
998   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
999
1000   if (!defined $task_success) {
1001     # task did not indicate one way or the other --> fail
1002     $Jobstep->{'arvados_task'}->{success} = 0;
1003     $Jobstep->{'arvados_task'}->save;
1004     $task_success = 0;
1005   }
1006
1007   if (!$task_success)
1008   {
1009     my $temporary_fail;
1010     $temporary_fail ||= $Jobstep->{node_fail};
1011     $temporary_fail ||= ($exitvalue == 111);
1012
1013     ++$thisround_failed;
1014     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1015
1016     # Check for signs of a failed or misconfigured node
1017     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1018         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1019       # Don't count this against jobstep failure thresholds if this
1020       # node is already suspected faulty and srun exited quickly
1021       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1022           $elapsed < 5) {
1023         Log ($jobstepid, "blaming failure on suspect node " .
1024              $slot[$proc{$pid}->{slot}]->{node}->{name});
1025         $temporary_fail ||= 1;
1026       }
1027       ban_node_by_slot($proc{$pid}->{slot});
1028     }
1029
1030     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1031                              ++$Jobstep->{'failures'},
1032                              $temporary_fail ? 'temporary ' : 'permanent',
1033                              $elapsed));
1034
1035     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1036       # Give up on this task, and the whole job
1037       $main::success = 0;
1038       $main::please_freeze = 1;
1039     }
1040     # Put this task back on the todo queue
1041     push @jobstep_todo, $jobstepid;
1042     $Job->{'tasks_summary'}->{'failed'}++;
1043   }
1044   else
1045   {
1046     ++$thisround_succeeded;
1047     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1048     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1049     push @jobstep_done, $jobstepid;
1050     Log ($jobstepid, "success in $elapsed seconds");
1051   }
1052   $Jobstep->{exitcode} = $childstatus;
1053   $Jobstep->{finishtime} = time;
1054   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1055   $Jobstep->{'arvados_task'}->save;
1056   process_stderr ($jobstepid, $task_success);
1057   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
1058
1059   close $reader{$jobstepid};
1060   delete $reader{$jobstepid};
1061   delete $slot[$proc{$pid}->{slot}]->{pid};
1062   push @freeslot, $proc{$pid}->{slot};
1063   delete $proc{$pid};
1064
1065   if ($task_success) {
1066     # Load new tasks
1067     my $newtask_list = [];
1068     my $newtask_results;
1069     do {
1070       $newtask_results = api_call(
1071         "job_tasks/list",
1072         'where' => {
1073           'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1074         },
1075         'order' => 'qsequence',
1076         'offset' => scalar(@$newtask_list),
1077       );
1078       push(@$newtask_list, @{$newtask_results->{items}});
1079     } while (@{$newtask_results->{items}});
1080     foreach my $arvados_task (@$newtask_list) {
1081       my $jobstep = {
1082         'level' => $arvados_task->{'sequence'},
1083         'failures' => 0,
1084         'arvados_task' => $arvados_task
1085       };
1086       push @jobstep, $jobstep;
1087       push @jobstep_todo, $#jobstep;
1088     }
1089   }
1090
1091   $progress_is_dirty = 1;
1092   1;
1093 }
1094
1095 sub check_refresh_wanted
1096 {
1097   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1098   if (@stat && $stat[9] > $latest_refresh) {
1099     $latest_refresh = scalar time;
1100     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1101     for my $attr ('cancelled_at',
1102                   'cancelled_by_user_uuid',
1103                   'cancelled_by_client_uuid',
1104                   'state') {
1105       $Job->{$attr} = $Job2->{$attr};
1106     }
1107     if ($Job->{'state'} ne "Running") {
1108       if ($Job->{'state'} eq "Cancelled") {
1109         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1110       } else {
1111         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1112       }
1113       $main::success = 0;
1114       $main::please_freeze = 1;
1115     }
1116   }
1117 }
1118
1119 sub check_squeue
1120 {
1121   # return if the kill list was checked <4 seconds ago
1122   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1123   {
1124     return;
1125   }
1126   $squeue_kill_checked = time;
1127
1128   # use killem() on procs whose killtime is reached
1129   for (keys %proc)
1130   {
1131     if (exists $proc{$_}->{killtime}
1132         && $proc{$_}->{killtime} <= time)
1133     {
1134       killem ($_);
1135     }
1136   }
1137
1138   # return if the squeue was checked <60 seconds ago
1139   if (defined $squeue_checked && $squeue_checked > time - 60)
1140   {
1141     return;
1142   }
1143   $squeue_checked = time;
1144
1145   if (!$have_slurm)
1146   {
1147     # here is an opportunity to check for mysterious problems with local procs
1148     return;
1149   }
1150
1151   # get a list of steps still running
1152   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1153   chop @squeue;
1154   if ($squeue[-1] ne "ok")
1155   {
1156     return;
1157   }
1158   pop @squeue;
1159
1160   # which of my jobsteps are running, according to squeue?
1161   my %ok;
1162   foreach (@squeue)
1163   {
1164     if (/^(\d+)\.(\d+) (\S+)/)
1165     {
1166       if ($1 eq $ENV{SLURM_JOBID})
1167       {
1168         $ok{$3} = 1;
1169       }
1170     }
1171   }
1172
1173   # which of my active child procs (>60s old) were not mentioned by squeue?
1174   foreach (keys %proc)
1175   {
1176     if ($proc{$_}->{time} < time - 60
1177         && !exists $ok{$proc{$_}->{jobstepname}}
1178         && !exists $proc{$_}->{killtime})
1179     {
1180       # kill this proc if it hasn't exited in 30 seconds
1181       $proc{$_}->{killtime} = time + 30;
1182     }
1183   }
1184 }
1185
1186
1187 sub release_allocation
1188 {
1189   if ($have_slurm)
1190   {
1191     Log (undef, "release job allocation");
1192     system "scancel $ENV{SLURM_JOBID}";
1193   }
1194 }
1195
1196
1197 sub readfrompipes
1198 {
1199   my $gotsome = 0;
1200   foreach my $job (keys %reader)
1201   {
1202     my $buf;
1203     while (0 < sysread ($reader{$job}, $buf, 8192))
1204     {
1205       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1206       $jobstep[$job]->{stderr} .= $buf;
1207       preprocess_stderr ($job);
1208       if (length ($jobstep[$job]->{stderr}) > 16384)
1209       {
1210         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1211       }
1212       $gotsome = 1;
1213     }
1214   }
1215   return $gotsome;
1216 }
1217
1218
1219 sub preprocess_stderr
1220 {
1221   my $job = shift;
1222
1223   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1224     my $line = $1;
1225     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1226     Log ($job, "stderr $line");
1227     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1228       # whoa.
1229       $main::please_freeze = 1;
1230     }
1231     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1232       $jobstep[$job]->{node_fail} = 1;
1233       ban_node_by_slot($jobstep[$job]->{slotindex});
1234     }
1235   }
1236 }
1237
1238
1239 sub process_stderr
1240 {
1241   my $job = shift;
1242   my $task_success = shift;
1243   preprocess_stderr ($job);
1244
1245   map {
1246     Log ($job, "stderr $_");
1247   } split ("\n", $jobstep[$job]->{stderr});
1248 }
1249
1250 sub fetch_block
1251 {
1252   my $hash = shift;
1253   my ($keep, $child_out, $output_block);
1254
1255   my $cmd = "arv-get \Q$hash\E";
1256   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1257   $output_block = '';
1258   while (1) {
1259     my $buf;
1260     my $bytes = sysread($keep, $buf, 1024 * 1024);
1261     if (!defined $bytes) {
1262       die "reading from arv-get: $!";
1263     } elsif ($bytes == 0) {
1264       # sysread returns 0 at the end of the pipe.
1265       last;
1266     } else {
1267       # some bytes were read into buf.
1268       $output_block .= $buf;
1269     }
1270   }
1271   close $keep;
1272   return $output_block;
1273 }
1274
1275 sub collate_output
1276 {
1277   Log (undef, "collate");
1278
1279   my ($child_out, $child_in);
1280   my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1281                   '--retries', retry_count());
1282   my $joboutput;
1283   for (@jobstep)
1284   {
1285     next if (!exists $_->{'arvados_task'}->{'output'} ||
1286              !$_->{'arvados_task'}->{'success'});
1287     my $output = $_->{'arvados_task'}->{output};
1288     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1289     {
1290       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1291       print $child_in $output;
1292     }
1293     elsif (@jobstep == 1)
1294     {
1295       $joboutput = $output;
1296       last;
1297     }
1298     elsif (defined (my $outblock = fetch_block ($output)))
1299     {
1300       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1301       print $child_in $outblock;
1302     }
1303     else
1304     {
1305       Log (undef, "XXX fetch_block($output) failed XXX");
1306       $main::success = 0;
1307     }
1308   }
1309   $child_in->close;
1310
1311   if (!defined $joboutput) {
1312     my $s = IO::Select->new($child_out);
1313     if ($s->can_read(120)) {
1314       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1315       chomp($joboutput);
1316       # TODO: Ensure exit status == 0.
1317     } else {
1318       Log (undef, "timed out reading from 'arv-put'");
1319     }
1320   }
1321   # TODO: kill $pid instead of waiting, now that we've decided to
1322   # ignore further output.
1323   waitpid($pid, 0);
1324
1325   return $joboutput;
1326 }
1327
1328
1329 sub killem
1330 {
1331   foreach (@_)
1332   {
1333     my $sig = 2;                # SIGINT first
1334     if (exists $proc{$_}->{"sent_$sig"} &&
1335         time - $proc{$_}->{"sent_$sig"} > 4)
1336     {
1337       $sig = 15;                # SIGTERM if SIGINT doesn't work
1338     }
1339     if (exists $proc{$_}->{"sent_$sig"} &&
1340         time - $proc{$_}->{"sent_$sig"} > 4)
1341     {
1342       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1343     }
1344     if (!exists $proc{$_}->{"sent_$sig"})
1345     {
1346       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1347       kill $sig, $_;
1348       select (undef, undef, undef, 0.1);
1349       if ($sig == 2)
1350       {
1351         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1352       }
1353       $proc{$_}->{"sent_$sig"} = time;
1354       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1355     }
1356   }
1357 }
1358
1359
1360 sub fhbits
1361 {
1362   my($bits);
1363   for (@_) {
1364     vec($bits,fileno($_),1) = 1;
1365   }
1366   $bits;
1367 }
1368
1369
1370 # Send log output to Keep via arv-put.
1371 #
1372 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1373 # $log_pipe_pid is the pid of the arv-put subprocess.
1374 #
1375 # The only functions that should access these variables directly are:
1376 #
1377 # log_writer_start($logfilename)
1378 #     Starts an arv-put pipe, reading data on stdin and writing it to
1379 #     a $logfilename file in an output collection.
1380 #
1381 # log_writer_send($txt)
1382 #     Writes $txt to the output log collection.
1383 #
1384 # log_writer_finish()
1385 #     Closes the arv-put pipe and returns the output that it produces.
1386 #
1387 # log_writer_is_active()
1388 #     Returns a true value if there is currently a live arv-put
1389 #     process, false otherwise.
1390 #
1391 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1392
1393 sub log_writer_start($)
1394 {
1395   my $logfilename = shift;
1396   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1397                         'arv-put', '--portable-data-hash',
1398                         '--retries', '3',
1399                         '--filename', $logfilename,
1400                         '-');
1401 }
1402
1403 sub log_writer_send($)
1404 {
1405   my $txt = shift;
1406   print $log_pipe_in $txt;
1407 }
1408
1409 sub log_writer_finish()
1410 {
1411   return unless $log_pipe_pid;
1412
1413   close($log_pipe_in);
1414   my $arv_put_output;
1415
1416   my $s = IO::Select->new($log_pipe_out);
1417   if ($s->can_read(120)) {
1418     sysread($log_pipe_out, $arv_put_output, 1024);
1419     chomp($arv_put_output);
1420   } else {
1421     Log (undef, "timed out reading from 'arv-put'");
1422   }
1423
1424   waitpid($log_pipe_pid, 0);
1425   $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1426   if ($?) {
1427     Log("log_writer_finish: arv-put exited ".exit_status_s($?))
1428   }
1429
1430   return $arv_put_output;
1431 }
1432
1433 sub log_writer_is_active() {
1434   return $log_pipe_pid;
1435 }
1436
1437 sub Log                         # ($jobstep_id, $logmessage)
1438 {
1439   if ($_[1] =~ /\n/) {
1440     for my $line (split (/\n/, $_[1])) {
1441       Log ($_[0], $line);
1442     }
1443     return;
1444   }
1445   my $fh = select STDERR; $|=1; select $fh;
1446   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1447   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1448   $message .= "\n";
1449   my $datetime;
1450   if (log_writer_is_active() || -t STDERR) {
1451     my @gmtime = gmtime;
1452     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1453                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1454   }
1455   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1456
1457   if (log_writer_is_active()) {
1458     log_writer_send($datetime . " " . $message);
1459   }
1460 }
1461
1462
1463 sub croak
1464 {
1465   my ($package, $file, $line) = caller;
1466   my $message = "@_ at $file line $line\n";
1467   Log (undef, $message);
1468   freeze() if @jobstep_todo;
1469   collate_output() if @jobstep_todo;
1470   cleanup();
1471   save_meta();
1472   die;
1473 }
1474
1475
1476 sub cleanup
1477 {
1478   return unless $Job;
1479   if ($Job->{'state'} eq 'Cancelled') {
1480     $Job->update_attributes('finished_at' => scalar gmtime);
1481   } else {
1482     $Job->update_attributes('state' => 'Failed');
1483   }
1484 }
1485
1486
1487 sub save_meta
1488 {
1489   my $justcheckpoint = shift; # false if this will be the last meta saved
1490   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1491   return unless log_writer_is_active();
1492
1493   my $loglocator = log_writer_finish();
1494   Log (undef, "log manifest is $loglocator");
1495   $Job->{'log'} = $loglocator;
1496   $Job->update_attributes('log', $loglocator);
1497 }
1498
1499
1500 sub freeze_if_want_freeze
1501 {
1502   if ($main::please_freeze)
1503   {
1504     release_allocation();
1505     if (@_)
1506     {
1507       # kill some srun procs before freeze+stop
1508       map { $proc{$_} = {} } @_;
1509       while (%proc)
1510       {
1511         killem (keys %proc);
1512         select (undef, undef, undef, 0.1);
1513         my $died;
1514         while (($died = waitpid (-1, WNOHANG)) > 0)
1515         {
1516           delete $proc{$died};
1517         }
1518       }
1519     }
1520     freeze();
1521     collate_output();
1522     cleanup();
1523     save_meta();
1524     exit 1;
1525   }
1526 }
1527
1528
1529 sub freeze
1530 {
1531   Log (undef, "Freeze not implemented");
1532   return;
1533 }
1534
1535
1536 sub thaw
1537 {
1538   croak ("Thaw not implemented");
1539 }
1540
1541
1542 sub freezequote
1543 {
1544   my $s = shift;
1545   $s =~ s/\\/\\\\/g;
1546   $s =~ s/\n/\\n/g;
1547   return $s;
1548 }
1549
1550
1551 sub freezeunquote
1552 {
1553   my $s = shift;
1554   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1555   return $s;
1556 }
1557
1558
1559 sub srun
1560 {
1561   my $srunargs = shift;
1562   my $execargs = shift;
1563   my $opts = shift || {};
1564   my $stdin = shift;
1565   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1566
1567   $Data::Dumper::Terse = 1;
1568   $Data::Dumper::Indent = 0;
1569   my $show_cmd = Dumper($args);
1570   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1571   $show_cmd =~ s/\n/ /g;
1572   warn "starting: $show_cmd\n";
1573
1574   if (defined $stdin) {
1575     my $child = open STDIN, "-|";
1576     defined $child or die "no fork: $!";
1577     if ($child == 0) {
1578       print $stdin or die $!;
1579       close STDOUT or die $!;
1580       exit 0;
1581     }
1582   }
1583
1584   return system (@$args) if $opts->{fork};
1585
1586   exec @$args;
1587   warn "ENV size is ".length(join(" ",%ENV));
1588   die "exec failed: $!: @$args";
1589 }
1590
1591
1592 sub ban_node_by_slot {
1593   # Don't start any new jobsteps on this node for 60 seconds
1594   my $slotid = shift;
1595   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1596   $slot[$slotid]->{node}->{hold_count}++;
1597   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1598 }
1599
1600 sub must_lock_now
1601 {
1602   my ($lockfile, $error_message) = @_;
1603   open L, ">", $lockfile or croak("$lockfile: $!");
1604   if (!flock L, LOCK_EX|LOCK_NB) {
1605     croak("Can't lock $lockfile: $error_message\n");
1606   }
1607 }
1608
1609 sub find_docker_image {
1610   # Given a Keep locator, check to see if it contains a Docker image.
1611   # If so, return its stream name and Docker hash.
1612   # If not, return undef for both values.
1613   my $locator = shift;
1614   my ($streamname, $filename);
1615   my $image = api_call("collections/get", uuid => $locator);
1616   if ($image) {
1617     foreach my $line (split(/\n/, $image->{manifest_text})) {
1618       my @tokens = split(/\s+/, $line);
1619       next if (!@tokens);
1620       $streamname = shift(@tokens);
1621       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1622         if (defined($filename)) {
1623           return (undef, undef);  # More than one file in the Collection.
1624         } else {
1625           $filename = (split(/:/, $filedata, 3))[2];
1626         }
1627       }
1628     }
1629   }
1630   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1631     return ($streamname, $1);
1632   } else {
1633     return (undef, undef);
1634   }
1635 }
1636
1637 sub retry_count {
1638   # Calculate the number of times an operation should be retried,
1639   # assuming exponential backoff, and that we're willing to retry as
1640   # long as tasks have been running.  Enforce a minimum of 3 retries.
1641   my ($starttime, $endtime, $timediff, $retries);
1642   if (@jobstep) {
1643     $starttime = $jobstep[0]->{starttime};
1644     $endtime = $jobstep[-1]->{finishtime};
1645   }
1646   if (!defined($starttime)) {
1647     $timediff = 0;
1648   } elsif (!defined($endtime)) {
1649     $timediff = time - $starttime;
1650   } else {
1651     $timediff = ($endtime - $starttime) - (time - $endtime);
1652   }
1653   if ($timediff > 0) {
1654     $retries = int(log($timediff) / log(2));
1655   } else {
1656     $retries = 1;  # Use the minimum.
1657   }
1658   return ($retries > 3) ? $retries : 3;
1659 }
1660
1661 sub retry_op {
1662   # Pass in two function references.
1663   # This method will be called with the remaining arguments.
1664   # If it dies, retry it with exponential backoff until it succeeds,
1665   # or until the current retry_count is exhausted.  After each failure
1666   # that can be retried, the second function will be called with
1667   # the current try count (0-based), next try time, and error message.
1668   my $operation = shift;
1669   my $retry_callback = shift;
1670   my $retries = retry_count();
1671   foreach my $try_count (0..$retries) {
1672     my $next_try = time + (2 ** $try_count);
1673     my $result = eval { $operation->(@_); };
1674     if (!$@) {
1675       return $result;
1676     } elsif ($try_count < $retries) {
1677       $retry_callback->($try_count, $next_try, $@);
1678       my $sleep_time = $next_try - time;
1679       sleep($sleep_time) if ($sleep_time > 0);
1680     }
1681   }
1682   # Ensure the error message ends in a newline, so Perl doesn't add
1683   # retry_op's line number to it.
1684   chomp($@);
1685   die($@ . "\n");
1686 }
1687
1688 sub api_call {
1689   # Pass in a /-separated API method name, and arguments for it.
1690   # This function will call that method, retrying as needed until
1691   # the current retry_count is exhausted, with a log on the first failure.
1692   my $method_name = shift;
1693   my $log_api_retry = sub {
1694     my ($try_count, $next_try_at, $errmsg) = @_;
1695     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1696     $errmsg =~ s/\s/ /g;
1697     $errmsg =~ s/\s+$//;
1698     my $retry_msg;
1699     if ($next_try_at < time) {
1700       $retry_msg = "Retrying.";
1701     } else {
1702       my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
1703       $retry_msg = "Retrying at $next_try_fmt.";
1704     }
1705     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1706   };
1707   my $method = $arv;
1708   foreach my $key (split(/\//, $method_name)) {
1709     $method = $method->{$key};
1710   }
1711   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1712 }
1713
1714 sub exit_status_s {
1715   # Given a $?, return a human-readable exit code string like "0" or
1716   # "1" or "0 with signal 1" or "1 with signal 11".
1717   my $exitcode = shift;
1718   my $s = $exitcode >> 8;
1719   if ($exitcode & 0x7f) {
1720     $s .= " with signal " . ($exitcode & 0x7f);
1721   }
1722   if ($exitcode & 0x80) {
1723     $s .= " with core dump";
1724   }
1725   return $s;
1726 }
1727
1728 sub handle_readall {
1729   # Pass in a glob reference to a file handle.
1730   # Read all its contents and return them as a string.
1731   my $fh_glob_ref = shift;
1732   local $/ = undef;
1733   return <$fh_glob_ref>;
1734 }
1735
1736 sub tar_filename_n {
1737   my $n = shift;
1738   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1739 }
1740
1741 sub add_git_archive {
1742   # Pass in a git archive command as a string or list, a la system().
1743   # This method will save its output to be included in the archive sent to the
1744   # build script.
1745   my $git_input;
1746   $git_tar_count++;
1747   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
1748     croak("Failed to save git archive: $!");
1749   }
1750   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
1751   close($git_input);
1752   waitpid($git_pid, 0);
1753   close(GIT_ARCHIVE);
1754   if ($?) {
1755     croak("Failed to save git archive: git exited " . exit_status_s($?));
1756   }
1757 }
1758
1759 sub combined_git_archive {
1760   # Combine all saved tar archives into a single archive, then return its
1761   # contents in a string.  Return undef if no archives have been saved.
1762   if ($git_tar_count < 1) {
1763     return undef;
1764   }
1765   my $base_tar_name = tar_filename_n(1);
1766   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
1767     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
1768     if ($tar_exit != 0) {
1769       croak("Error preparing build archive: tar -A exited " .
1770             exit_status_s($tar_exit));
1771     }
1772   }
1773   if (!open(GIT_TAR, "<", $base_tar_name)) {
1774     croak("Could not open build archive: $!");
1775   }
1776   my $tar_contents = handle_readall(\*GIT_TAR);
1777   close(GIT_TAR);
1778   return $tar_contents;
1779 }
1780
1781 __DATA__
1782 #!/usr/bin/perl
1783 #
1784 # This is crunch-job's internal dispatch script.  crunch-job running on the API
1785 # server invokes this script on individual compute nodes, or localhost if we're
1786 # running a job locally.  It gets called in two modes:
1787 #
1788 # * No arguments: Installation mode.  Read a tar archive from the DATA
1789 #   file handle; it includes the Crunch script's source code, and
1790 #   maybe SDKs as well.  Those should be installed in the proper
1791 #   locations.
1792 #
1793 # * With arguments: Crunch script run mode.  This script should set up the
1794 #   environment, then run the command specified in the arguments.
1795
1796 use Fcntl ':flock';
1797 use File::Path qw( make_path remove_tree );
1798 use POSIX qw(getcwd);
1799
1800 # Map SDK subdirectories to the path environments they belong to.
1801 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
1802
1803 my $destdir = $ENV{"CRUNCH_SRC"};
1804 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1805 my $repo = $ENV{"CRUNCH_SRC_URL"};
1806 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
1807 my $job_work = $ENV{"JOB_WORK"};
1808 my $task_work = $ENV{"TASK_WORK"};
1809
1810 for my $dir ($destdir, $job_work, $task_work) {
1811   if ($dir) {
1812     make_path $dir;
1813     -e $dir or die "Failed to create temporary directory ($dir): $!";
1814   }
1815 }
1816
1817 if ($task_work) {
1818   remove_tree($task_work, {keep_root => 1});
1819 }
1820
1821 if (@ARGV) {
1822   if (-e "$install_dir/bin/activate") {
1823     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
1824     @ARGV = ("/bin/sh", "-ec",
1825              ". \Q$install_dir/bin/activate\E; exec $orig_argv");
1826   }
1827   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
1828     my $sdk_path = "$install_dir/$sdk_dir";
1829     if (-d $sdk_path) {
1830       if ($ENV{$sdk_envkey}) {
1831         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
1832       } else {
1833         $ENV{$sdk_envkey} = $sdk_path;
1834       }
1835     }
1836   }
1837   exec(@ARGV);
1838   die "Cannot exec `@ARGV`: $!";
1839 }
1840
1841 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1842 flock L, LOCK_EX;
1843 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1844   # This version already installed -> nothing to do.
1845   exit(0);
1846 }
1847
1848 unlink "$destdir.commit";
1849 open STDERR_ORIG, ">&STDERR";
1850 open STDOUT, ">", "$destdir.log";
1851 open STDERR, ">&STDOUT";
1852
1853 mkdir $destdir;
1854 open TARX, "|-", "tar", "-xC", $destdir;
1855 {
1856   local $/ = undef;
1857   print TARX <DATA>;
1858 }
1859 if(!close(TARX)) {
1860   die "'tar -xC $destdir' exited $?: $!";
1861 }
1862
1863 mkdir $install_dir;
1864
1865 my $sdk_root = "$destdir/.arvados.sdk/sdk";
1866 if (-d $sdk_root) {
1867   if (can_run("virtualenv")) {
1868     shell_or_die("virtualenv", "--python=python2.7", "--system-site-packages",
1869                  $install_dir);
1870     shell_or_die("$install_dir/bin/pip", "install", "$sdk_root/python");
1871   }
1872
1873   foreach my $sdk_lang (map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS)) {
1874     if (-d "$sdk_root/$sdk_lang") {
1875       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
1876         die "Failed to install $sdk_lang SDK: $!";
1877       }
1878     }
1879   }
1880 }
1881
1882 if (-e "$destdir/crunch_scripts/install") {
1883     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1884 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1885     # Old version
1886     shell_or_die ("./tests/autotests.sh", $install_dir);
1887 } elsif (-e "./install.sh") {
1888     shell_or_die ("./install.sh", $install_dir);
1889 }
1890
1891 if ($commit) {
1892     unlink "$destdir.commit.new";
1893     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1894     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1895 }
1896
1897 close L;
1898
1899 sub can_run {
1900   my $command_name = shift;
1901   open(my $which, "-|", "which", $command_name);
1902   while (<$which>) { }
1903   close($which);
1904   return ($? == 0);
1905 }
1906
1907 sub shell_or_die
1908 {
1909   if ($ENV{"DEBUG"}) {
1910     print STDERR "@_\n";
1911   }
1912   if (system (@_) != 0) {
1913     my $err = $!;
1914     my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
1915     open STDERR, ">&STDERR_ORIG";
1916     system ("cat $destdir.log >&2");
1917     die "@_ failed ($err): $exitstatus";
1918   }
1919 }
1920
1921 __DATA__