8099: Remove duplicated line
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/env perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z --git-dir /path/to/repo/.git
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
19
20  crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
21
22 =head1 OPTIONS
23
24 =over
25
26 =item --force-unlock
27
28 If the job is already locked, steal the lock and run it anyway.
29
30 =item --git-dir
31
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
35
36 =item --job-api-token
37
38 Arvados API authorization token to use during the course of the job.
39
40 =item --no-clear-tmp
41
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
44 locally.
45
46 =item --job
47
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
50
51 =back
52
53 =head1 RUNNING JOBS LOCALLY
54
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
57 the job finishes.
58
59 If the job succeeds, the job's output locator is printed on stdout.
60
61 While the job is running, the following signals are accepted:
62
63 =over
64
65 =item control-C, SIGINT, SIGQUIT
66
67 Save a checkpoint, terminate any job tasks that are running, and stop.
68
69 =item SIGALRM
70
71 Save a checkpoint and continue.
72
73 =item SIGHUP
74
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
78
79 =back
80
81 =cut
82
83
84 use strict;
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
88 use Arvados;
89 use Cwd qw(realpath);
90 use Data::Dumper;
91 use Digest::MD5 qw(md5_hex);
92 use Getopt::Long;
93 use IPC::Open2;
94 use IO::Select;
95 use File::Temp;
96 use Fcntl ':flock';
97 use File::Path qw( make_path remove_tree );
98
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
102
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106   if ($ENV{"USER"} ne "crunch" && $< != 0) {
107     # use a tmp dir unique for my uid
108     $ENV{"CRUNCH_TMP"} .= "-$<";
109   }
110 }
111
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
115 }
116
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
121
122 my %proc;
123 my $force_unlock;
124 my $git_dir;
125 my $jobspec;
126 my $job_api_token;
127 my $no_clear_tmp;
128 my $resume_stash;
129 my $docker_bin = "docker.io";
130 my $docker_run_args = "";
131 GetOptions('force-unlock' => \$force_unlock,
132            'git-dir=s' => \$git_dir,
133            'job=s' => \$jobspec,
134            'job-api-token=s' => \$job_api_token,
135            'no-clear-tmp' => \$no_clear_tmp,
136            'resume-stash=s' => \$resume_stash,
137            'docker-bin=s' => \$docker_bin,
138            'docker-run-args=s' => \$docker_run_args,
139     );
140
141 if (defined $job_api_token) {
142   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 }
144
145 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
146
147
148 $SIG{'USR1'} = sub
149 {
150   $main::ENV{CRUNCH_DEBUG} = 1;
151 };
152 $SIG{'USR2'} = sub
153 {
154   $main::ENV{CRUNCH_DEBUG} = 0;
155 };
156
157 my $arv = Arvados->new('apiVersion' => 'v1');
158
159 my $Job;
160 my $job_id;
161 my $dbh;
162 my $sth;
163 my @jobstep;
164
165 my $local_job;
166 if ($jobspec =~ /^[-a-z\d]+$/)
167 {
168   # $jobspec is an Arvados UUID, not a JSON job specification
169   $Job = api_call("jobs/get", uuid => $jobspec);
170   $local_job = 0;
171 }
172 else
173 {
174   $local_job = JSON::decode_json($jobspec);
175 }
176
177
178 # Make sure our workers (our slurm nodes, localhost, or whatever) are
179 # at least able to run basic commands: they aren't down or severely
180 # misconfigured.
181 my $cmd = ['true'];
182 if (($Job || $local_job)->{docker_image_locator}) {
183   $cmd = [$docker_bin, 'ps', '-q'];
184 }
185 Log(undef, "Sanity check is `@$cmd`");
186 my ($exited, $stdout, $stderr) = srun_sync(
187   ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
188   $cmd,
189   {label => "sanity check"});
190 if ($exited != 0) {
191   Log(undef, "Sanity check failed: ".exit_status_s($exited));
192   exit EX_TEMPFAIL;
193 }
194 Log(undef, "Sanity check OK");
195
196
197 my $User = api_call("users/current");
198
199 if (!$local_job) {
200   if (!$force_unlock) {
201     # Claim this job, and make sure nobody else does
202     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
203     if ($@) {
204       Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
205       exit EX_TEMPFAIL;
206     };
207   }
208 }
209 else
210 {
211   if (!$resume_stash)
212   {
213     map { croak ("No $_ specified") unless $local_job->{$_} }
214     qw(script script_version script_parameters);
215   }
216
217   $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
218   $local_job->{'started_at'} = gmtime;
219   $local_job->{'state'} = 'Running';
220
221   $Job = api_call("jobs/create", job => $local_job);
222 }
223 $job_id = $Job->{'uuid'};
224
225 my $keep_logfile = $job_id . '.log.txt';
226 log_writer_start($keep_logfile);
227
228 $Job->{'runtime_constraints'} ||= {};
229 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
230 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
231
232 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
233 if ($? == 0) {
234   $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
235   chomp($gem_versions);
236   chop($gem_versions);  # Closing parentheses
237 } else {
238   $gem_versions = "";
239 }
240 Log(undef,
241     "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
242
243 Log (undef, "check slurm allocation");
244 my @slot;
245 my @node;
246 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
247 my @sinfo;
248 if (!$have_slurm)
249 {
250   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
251   push @sinfo, "$localcpus localhost";
252 }
253 if (exists $ENV{SLURM_NODELIST})
254 {
255   push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
256 }
257 foreach (@sinfo)
258 {
259   my ($ncpus, $slurm_nodelist) = split;
260   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
261
262   my @nodelist;
263   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
264   {
265     my $nodelist = $1;
266     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
267     {
268       my $ranges = $1;
269       foreach (split (",", $ranges))
270       {
271         my ($a, $b);
272         if (/(\d+)-(\d+)/)
273         {
274           $a = $1;
275           $b = $2;
276         }
277         else
278         {
279           $a = $_;
280           $b = $_;
281         }
282         push @nodelist, map {
283           my $n = $nodelist;
284           $n =~ s/\[[-,\d]+\]/$_/;
285           $n;
286         } ($a..$b);
287       }
288     }
289     else
290     {
291       push @nodelist, $nodelist;
292     }
293   }
294   foreach my $nodename (@nodelist)
295   {
296     Log (undef, "node $nodename - $ncpus slots");
297     my $node = { name => $nodename,
298                  ncpus => $ncpus,
299                  # The number of consecutive times a task has been dispatched
300                  # to this node and failed.
301                  losing_streak => 0,
302                  # The number of consecutive times that SLURM has reported
303                  # a node failure since the last successful task.
304                  fail_count => 0,
305                  # Don't dispatch work to this node until this time
306                  # (in seconds since the epoch) has passed.
307                  hold_until => 0 };
308     foreach my $cpu (1..$ncpus)
309     {
310       push @slot, { node => $node,
311                     cpu => $cpu };
312     }
313   }
314   push @node, @nodelist;
315 }
316
317
318
319 # Ensure that we get one jobstep running on each allocated node before
320 # we start overloading nodes with concurrent steps
321
322 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
323
324
325 $Job->update_attributes(
326   'tasks_summary' => { 'failed' => 0,
327                        'todo' => 1,
328                        'running' => 0,
329                        'done' => 0 });
330
331 Log (undef, "start");
332 $SIG{'INT'} = sub { $main::please_freeze = 1; };
333 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
334 $SIG{'TERM'} = \&croak;
335 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
336 $SIG{'ALRM'} = sub { $main::please_info = 1; };
337 $SIG{'CONT'} = sub { $main::please_continue = 1; };
338 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
339
340 $main::please_freeze = 0;
341 $main::please_info = 0;
342 $main::please_continue = 0;
343 $main::please_refresh = 0;
344 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
345
346 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
347 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
348 $ENV{"JOB_UUID"} = $job_id;
349
350
351 my @jobstep_todo = ();
352 my @jobstep_done = ();
353 my @jobstep_tomerge = ();
354 my $jobstep_tomerge_level = 0;
355 my $squeue_checked = 0;
356 my $latest_refresh = scalar time;
357
358
359
360 if (defined $Job->{thawedfromkey})
361 {
362   thaw ($Job->{thawedfromkey});
363 }
364 else
365 {
366   my $first_task = api_call("job_tasks/create", job_task => {
367     'job_uuid' => $Job->{'uuid'},
368     'sequence' => 0,
369     'qsequence' => 0,
370     'parameters' => {},
371   });
372   push @jobstep, { 'level' => 0,
373                    'failures' => 0,
374                    'arvados_task' => $first_task,
375                  };
376   push @jobstep_todo, 0;
377 }
378
379
380 if (!$have_slurm)
381 {
382   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
383 }
384
385 my $build_script = handle_readall(\*DATA);
386 my $nodelist = join(",", @node);
387 my $git_tar_count = 0;
388
389 if (!defined $no_clear_tmp) {
390   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
391   # up work directories crunch_tmp/work, crunch_tmp/opt,
392   # crunch_tmp/src*.
393   #
394   # TODO: When #5036 is done and widely deployed, we can limit mount's
395   # -t option to simply fuse.keep.
396   my ($exited, $stdout, $stderr) = srun_sync(
397     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398     ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
399     {label => "clean work dirs"});
400   if ($exited != 0) {
401     exit(EX_RETRY_UNLOCKED);
402   }
403 }
404
405 # If this job requires a Docker image, install that.
406 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
407 if ($docker_locator = $Job->{docker_image_locator}) {
408   Log (undef, "Install docker image $docker_locator");
409   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
410   if (!$docker_hash)
411   {
412     croak("No Docker image hash found from locator $docker_locator");
413   }
414   Log (undef, "docker image hash is $docker_hash");
415   $docker_stream =~ s/^\.//;
416   my $docker_install_script = qq{
417 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
418     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
419 fi
420 };
421
422   my ($exited, $stdout, $stderr) = srun_sync(
423     ["srun", "--nodelist=" . join(',', @node)],
424     ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
425     {label => "load docker image"});
426   if ($exited != 0)
427   {
428     exit(EX_RETRY_UNLOCKED);
429   }
430
431   # Determine whether this version of Docker supports memory+swap limits.
432   ($exited, $stdout, $stderr) = srun_sync(
433     ["srun", "--nodelist=" . $node[0]],
434     [$docker_bin, 'run', '--help'],
435     {label => "check --memory-swap feature"});
436   $docker_limitmem = ($stdout =~ /--memory-swap/);
437
438   # Find a non-root Docker user to use.
439   # Tries the default user for the container, then 'crunch', then 'nobody',
440   # testing for whether the actual user id is non-zero.  This defends against
441   # mistakes but not malice, but we intend to harden the security in the future
442   # so we don't want anyone getting used to their jobs running as root in their
443   # Docker containers.
444   my @tryusers = ("", "crunch", "nobody");
445   foreach my $try_user (@tryusers) {
446     my $label;
447     my $try_user_arg;
448     if ($try_user eq "") {
449       $label = "check whether default user is UID 0";
450       $try_user_arg = "";
451     } else {
452       $label = "check whether user '$try_user' is UID 0";
453       $try_user_arg = "--user=$try_user";
454     }
455     my ($exited, $stdout, $stderr) = srun_sync(
456       ["srun", "--nodelist=" . $node[0]],
457       ["/bin/sh", "-ec",
458        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
459       {label => $label});
460     chomp($stdout);
461     if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
462       $dockeruserarg = $try_user_arg;
463       if ($try_user eq "") {
464         Log(undef, "Container will run with default user");
465       } else {
466         Log(undef, "Container will run with $dockeruserarg");
467       }
468       last;
469     }
470   }
471
472   if (!defined $dockeruserarg) {
473     croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
474   }
475
476   if ($Job->{arvados_sdk_version}) {
477     # The job also specifies an Arvados SDK version.  Add the SDKs to the
478     # tar file for the build script to install.
479     Log(undef, sprintf("Packing Arvados SDK version %s for installation",
480                        $Job->{arvados_sdk_version}));
481     add_git_archive("git", "--git-dir=$git_dir", "archive",
482                     "--prefix=.arvados.sdk/",
483                     $Job->{arvados_sdk_version}, "sdk");
484   }
485 }
486
487 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
488   # If script_version looks like an absolute path, *and* the --git-dir
489   # argument was not given -- which implies we were not invoked by
490   # crunch-dispatch -- we will use the given path as a working
491   # directory instead of resolving script_version to a git commit (or
492   # doing anything else with git).
493   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
494   $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
495 }
496 else {
497   # Resolve the given script_version to a git commit sha1. Also, if
498   # the repository is remote, clone it into our local filesystem: this
499   # ensures "git archive" will work, and is necessary to reliably
500   # resolve a symbolic script_version like "master^".
501   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
502
503   Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
504
505   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
506
507   # If we're running under crunch-dispatch, it will have already
508   # pulled the appropriate source tree into its own repository, and
509   # given us that repo's path as $git_dir.
510   #
511   # If we're running a "local" job, we might have to fetch content
512   # from a remote repository.
513   #
514   # (Currently crunch-dispatch gives a local path with --git-dir, but
515   # we might as well accept URLs there too in case it changes its
516   # mind.)
517   my $repo = $git_dir || $Job->{'repository'};
518
519   # Repository can be remote or local. If remote, we'll need to fetch it
520   # to a local dir before doing `git log` et al.
521   my $repo_location;
522
523   if ($repo =~ m{://|^[^/]*:}) {
524     # $repo is a git url we can clone, like git:// or https:// or
525     # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
526     # not recognized here because distinguishing that from a local
527     # path is too fragile. If you really need something strange here,
528     # use the ssh:// form.
529     $repo_location = 'remote';
530   } elsif ($repo =~ m{^\.*/}) {
531     # $repo is a local path to a git index. We'll also resolve ../foo
532     # to ../foo/.git if the latter is a directory. To help
533     # disambiguate local paths from named hosted repositories, this
534     # form must be given as ./ or ../ if it's a relative path.
535     if (-d "$repo/.git") {
536       $repo = "$repo/.git";
537     }
538     $repo_location = 'local';
539   } else {
540     # $repo is none of the above. It must be the name of a hosted
541     # repository.
542     my $arv_repo_list = api_call("repositories/list",
543                                  'filters' => [['name','=',$repo]]);
544     my @repos_found = @{$arv_repo_list->{'items'}};
545     my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
546     if ($n_found > 0) {
547       Log(undef, "Repository '$repo' -> "
548           . join(", ", map { $_->{'uuid'} } @repos_found));
549     }
550     if ($n_found != 1) {
551       croak("Error: Found $n_found repositories with name '$repo'.");
552     }
553     $repo = $repos_found[0]->{'fetch_url'};
554     $repo_location = 'remote';
555   }
556   Log(undef, "Using $repo_location repository '$repo'");
557   $ENV{"CRUNCH_SRC_URL"} = $repo;
558
559   # Resolve given script_version (we'll call that $treeish here) to a
560   # commit sha1 ($commit).
561   my $treeish = $Job->{'script_version'};
562   my $commit;
563   if ($repo_location eq 'remote') {
564     # We minimize excess object-fetching by re-using the same bare
565     # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
566     # just keep adding remotes to it as needed.
567     my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
568     my $gitcmd = "git --git-dir=\Q$local_repo\E";
569
570     # Set up our local repo for caching remote objects, making
571     # archives, etc.
572     if (!-d $local_repo) {
573       make_path($local_repo) or croak("Error: could not create $local_repo");
574     }
575     # This works (exits 0 and doesn't delete fetched objects) even
576     # if $local_repo is already initialized:
577     `$gitcmd init --bare`;
578     if ($?) {
579       croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
580     }
581
582     # If $treeish looks like a hash (or abbrev hash) we look it up in
583     # our local cache first, since that's cheaper. (We don't want to
584     # do that with tags/branches though -- those change over time, so
585     # they should always be resolved by the remote repo.)
586     if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
587       # Hide stderr because it's normal for this to fail:
588       my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
589       if ($? == 0 &&
590           # Careful not to resolve a branch named abcdeff to commit 1234567:
591           $sha1 =~ /^$treeish/ &&
592           $sha1 =~ /^([0-9a-f]{40})$/s) {
593         $commit = $1;
594         Log(undef, "Commit $commit already present in $local_repo");
595       }
596     }
597
598     if (!defined $commit) {
599       # If $treeish isn't just a hash or abbrev hash, or isn't here
600       # yet, we need to fetch the remote to resolve it correctly.
601
602       # First, remove all local heads. This prevents a name that does
603       # not exist on the remote from resolving to (or colliding with)
604       # a previously fetched branch or tag (possibly from a different
605       # remote).
606       remove_tree("$local_repo/refs/heads", {keep_root => 1});
607
608       Log(undef, "Fetching objects from $repo to $local_repo");
609       `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
610       if ($?) {
611         croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
612       }
613     }
614
615     # Now that the data is all here, we will use our local repo for
616     # the rest of our git activities.
617     $repo = $local_repo;
618   }
619
620   my $gitcmd = "git --git-dir=\Q$repo\E";
621   my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
622   unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
623     croak("`$gitcmd rev-list` exited "
624           .exit_status_s($?)
625           .", '$treeish' not found, giving up");
626   }
627   $commit = $1;
628   Log(undef, "Version $treeish is commit $commit");
629
630   if ($commit ne $Job->{'script_version'}) {
631     # Record the real commit id in the database, frozentokey, logs,
632     # etc. -- instead of an abbreviation or a branch name which can
633     # become ambiguous or point to a different commit in the future.
634     if (!$Job->update_attributes('script_version' => $commit)) {
635       croak("Error: failed to update job's script_version attribute");
636     }
637   }
638
639   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
640   add_git_archive("$gitcmd archive ''\Q$commit\E");
641 }
642
643 my $git_archive = combined_git_archive();
644 if (!defined $git_archive) {
645   Log(undef, "Skip install phase (no git archive)");
646   if ($have_slurm) {
647     Log(undef, "Warning: This probably means workers have no source tree!");
648   }
649 }
650 else {
651   my $exited;
652   my $install_script_tries_left = 3;
653   for (my $attempts = 0; $attempts < 3; $attempts++) {
654     my @srunargs = ("srun",
655                     "--nodelist=$nodelist",
656                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
657     my @execargs = ("sh", "-c",
658                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
659
660     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
661     my ($stdout, $stderr);
662     ($exited, $stdout, $stderr) = srun_sync(
663       \@srunargs, \@execargs,
664       {label => "run install script on all workers"},
665       $build_script . $git_archive);
666
667     my $stderr_anything_from_script = 0;
668     for my $line (split(/\n/, $stderr)) {
669       if ($line !~ /^(srun: error: |starting: \[)/) {
670         $stderr_anything_from_script = 1;
671       }
672     }
673
674     last if $exited == 0 || $main::please_freeze;
675
676     # If the install script fails but doesn't print an error message,
677     # the next thing anyone is likely to do is just run it again in
678     # case it was a transient problem like "slurm communication fails
679     # because the network isn't reliable enough". So we'll just do
680     # that ourselves (up to 3 attempts in total). OTOH, if there is an
681     # error message, the problem is more likely to have a real fix and
682     # we should fail the job so the fixing process can start, instead
683     # of doing 2 more attempts.
684     last if $stderr_anything_from_script;
685   }
686
687   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
688     unlink($tar_filename);
689   }
690
691   if ($exited != 0) {
692     croak("Giving up");
693   }
694 }
695
696 foreach (qw (script script_version script_parameters runtime_constraints))
697 {
698   Log (undef,
699        "$_ " .
700        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
701 }
702 foreach (split (/\n/, $Job->{knobs}))
703 {
704   Log (undef, "knob " . $_);
705 }
706
707
708
709 $main::success = undef;
710
711
712
713 ONELEVEL:
714
715 my $thisround_succeeded = 0;
716 my $thisround_failed = 0;
717 my $thisround_failed_multiple = 0;
718 my $working_slot_count = scalar(@slot);
719
720 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
721                        or $a <=> $b } @jobstep_todo;
722 my $level = $jobstep[$jobstep_todo[0]]->{level};
723
724 my $initial_tasks_this_level = 0;
725 foreach my $id (@jobstep_todo) {
726   $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
727 }
728
729 # If the number of tasks scheduled at this level #T is smaller than the number
730 # of slots available #S, only use the first #T slots, or the first slot on
731 # each node, whichever number is greater.
732 #
733 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
734 # based on these numbers.  Using fewer slots makes more resources available
735 # to each individual task, which should normally be a better strategy when
736 # there are fewer of them running with less parallelism.
737 #
738 # Note that this calculation is not redone if the initial tasks at
739 # this level queue more tasks at the same level.  This may harm
740 # overall task throughput for that level.
741 my @freeslot;
742 if ($initial_tasks_this_level < @node) {
743   @freeslot = (0..$#node);
744 } elsif ($initial_tasks_this_level < @slot) {
745   @freeslot = (0..$initial_tasks_this_level - 1);
746 } else {
747   @freeslot = (0..$#slot);
748 }
749 my $round_num_freeslots = scalar(@freeslot);
750 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
751
752 my %round_max_slots = ();
753 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
754   my $this_slot = $slot[$freeslot[$ii]];
755   my $node_name = $this_slot->{node}->{name};
756   $round_max_slots{$node_name} ||= $this_slot->{cpu};
757   last if (scalar(keys(%round_max_slots)) >= @node);
758 }
759
760 Log(undef, "start level $level with $round_num_freeslots slots");
761 my @holdslot;
762 my %reader;
763 my $progress_is_dirty = 1;
764 my $progress_stats_updated = 0;
765
766 update_progress_stats();
767
768
769 THISROUND:
770 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
771 {
772   # Don't create new tasks if we already know the job's final result.
773   last if defined($main::success);
774
775   my $id = $jobstep_todo[$todo_ptr];
776   my $Jobstep = $jobstep[$id];
777   if ($Jobstep->{level} != $level)
778   {
779     next;
780   }
781
782   pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
783   set_nonblocking($reader{$id});
784
785   my $childslot = $freeslot[0];
786   my $childnode = $slot[$childslot]->{node};
787   my $childslotname = join (".",
788                             $slot[$childslot]->{node}->{name},
789                             $slot[$childslot]->{cpu});
790
791   my $childpid = fork();
792   if ($childpid == 0)
793   {
794     $SIG{'INT'} = 'DEFAULT';
795     $SIG{'QUIT'} = 'DEFAULT';
796     $SIG{'TERM'} = 'DEFAULT';
797
798     foreach (values (%reader))
799     {
800       close($_);
801     }
802     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
803     open(STDOUT,">&writer");
804     open(STDERR,">&writer");
805
806     undef $dbh;
807     undef $sth;
808
809     delete $ENV{"GNUPGHOME"};
810     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
811     $ENV{"TASK_QSEQUENCE"} = $id;
812     $ENV{"TASK_SEQUENCE"} = $level;
813     $ENV{"JOB_SCRIPT"} = $Job->{script};
814     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
815       $param =~ tr/a-z/A-Z/;
816       $ENV{"JOB_PARAMETER_$param"} = $value;
817     }
818     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
819     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
820     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
821     $ENV{"HOME"} = $ENV{"TASK_WORK"};
822     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
823     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
824     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
825
826     my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
827
828     $ENV{"GZIP"} = "-n";
829
830     my @srunargs = (
831       "srun",
832       "--nodelist=".$childnode->{name},
833       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
834       "--job-name=$job_id.$id.$$",
835         );
836
837     my $stdbuf = " stdbuf --output=0 --error=0 ";
838
839     my $arv_file_cache = "";
840     if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
841       $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
842     }
843
844     my $command =
845         "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
846         ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
847         ."&& cd \Q$ENV{CRUNCH_TMP}\E "
848         # These environment variables get used explicitly later in
849         # $command.  No tool is expected to read these values directly.
850         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
851         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
852         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
853         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
854
855     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
856     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
857     $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
858
859     if ($docker_hash)
860     {
861       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
862       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
863       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
864       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
865       # We only set memory limits if Docker lets us limit both memory and swap.
866       # Memory limits alone have been supported longer, but subprocesses tend
867       # to get SIGKILL if they exceed that without any swap limit set.
868       # See #5642 for additional background.
869       if ($docker_limitmem) {
870         $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
871       }
872
873       # The source tree and $destdir directory (which we have
874       # installed on the worker host) are available in the container,
875       # under the same path.
876       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
877       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
878
879       # Currently, we make the "by_pdh" directory in arv-mount's mount
880       # point appear at /keep inside the container (instead of using
881       # the same path as the host like we do with CRUNCH_SRC and
882       # CRUNCH_INSTALL). However, crunch scripts and utilities must
883       # not rely on this. They must use $TASK_KEEPMOUNT.
884       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
885       $ENV{TASK_KEEPMOUNT} = "/keep";
886
887       # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
888       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
889       $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
890
891       # TASK_WORK is almost exactly like a docker data volume: it
892       # starts out empty, is writable, and persists until no
893       # containers use it any more. We don't use --volumes-from to
894       # share it with other containers: it is only accessible to this
895       # task, and it goes away when this task stops.
896       #
897       # However, a docker data volume is writable only by root unless
898       # the mount point already happens to exist in the container with
899       # different permissions. Therefore, we [1] assume /tmp already
900       # exists in the image and is writable by the crunch user; [2]
901       # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
902       # writable if they are created by docker while setting up the
903       # other --volumes); and [3] create $TASK_WORK inside the
904       # container using $build_script.
905       $command .= "--volume=/tmp ";
906       $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
907       $ENV{"HOME"} = $ENV{"TASK_WORK"};
908       $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
909
910       # TODO: Share a single JOB_WORK volume across all task
911       # containers on a given worker node, and delete it when the job
912       # ends (and, in case that doesn't work, when the next job
913       # starts).
914       #
915       # For now, use the same approach as TASK_WORK above.
916       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
917
918       while (my ($env_key, $env_val) = each %ENV)
919       {
920         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
921           $command .= "--env=\Q$env_key=$env_val\E ";
922         }
923       }
924       $command .= "--env=\QHOME=$ENV{HOME}\E ";
925       $command .= "\Q$docker_hash\E ";
926
927       if ($Job->{arvados_sdk_version}) {
928         $command .= $stdbuf;
929         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
930       } else {
931         $command .= "/bin/sh -c \'python -c " .
932             '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
933             ">&2 2>/dev/null; " .
934             "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
935             "if which stdbuf >/dev/null ; then " .
936             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
937             " else " .
938             "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
939             " fi\'";
940       }
941     } else {
942       # Non-docker run
943       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
944       $command .= $stdbuf;
945       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
946     }
947
948     my @execargs = ('bash', '-c', $command);
949     srun (\@srunargs, \@execargs, undef, $build_script);
950     # exec() failed, we assume nothing happened.
951     die "srun() failed on build script\n";
952   }
953   close("writer");
954   if (!defined $childpid)
955   {
956     close $reader{$id};
957     delete $reader{$id};
958     next;
959   }
960   shift @freeslot;
961   $proc{$childpid} = {
962     jobstepidx => $id,
963     time => time,
964     slot => $childslot,
965     jobstepname => "$job_id.$id.$childpid",
966   };
967   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
968   $slot[$childslot]->{pid} = $childpid;
969
970   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
971   Log ($id, "child $childpid started on $childslotname");
972   $Jobstep->{starttime} = time;
973   $Jobstep->{node} = $childnode->{name};
974   $Jobstep->{slotindex} = $childslot;
975   delete $Jobstep->{stderr};
976   delete $Jobstep->{finishtime};
977   delete $Jobstep->{tempfail};
978
979   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
980   $Jobstep->{'arvados_task'}->save;
981
982   splice @jobstep_todo, $todo_ptr, 1;
983   --$todo_ptr;
984
985   $progress_is_dirty = 1;
986
987   while (!@freeslot
988          ||
989          ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
990   {
991     last THISROUND if $main::please_freeze;
992     if ($main::please_info)
993     {
994       $main::please_info = 0;
995       freeze();
996       create_output_collection();
997       save_meta(1);
998       update_progress_stats();
999     }
1000     my $gotsome
1001         = readfrompipes ()
1002         + reapchildren ();
1003     if (!$gotsome || ($latest_refresh + 2 < scalar time))
1004     {
1005       check_refresh_wanted();
1006       check_squeue();
1007       update_progress_stats();
1008     }
1009     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1010     {
1011       update_progress_stats();
1012     }
1013     if (!$gotsome) {
1014       select (undef, undef, undef, 0.1);
1015     }
1016     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1017                                         $_->{node}->{hold_count} < 4 } @slot);
1018     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1019         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1020     {
1021       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1022           .($thisround_failed+$thisround_succeeded)
1023           .") -- giving up on this round";
1024       Log (undef, $message);
1025       last THISROUND;
1026     }
1027
1028     # move slots from freeslot to holdslot (or back to freeslot) if necessary
1029     for (my $i=$#freeslot; $i>=0; $i--) {
1030       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1031         push @holdslot, (splice @freeslot, $i, 1);
1032       }
1033     }
1034     for (my $i=$#holdslot; $i>=0; $i--) {
1035       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1036         push @freeslot, (splice @holdslot, $i, 1);
1037       }
1038     }
1039
1040     # give up if no nodes are succeeding
1041     if ($working_slot_count < 1) {
1042       Log(undef, "Every node has failed -- giving up");
1043       last THISROUND;
1044     }
1045   }
1046 }
1047
1048
1049 push @freeslot, splice @holdslot;
1050 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1051
1052
1053 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1054 while (%proc)
1055 {
1056   if ($main::please_continue) {
1057     $main::please_continue = 0;
1058     goto THISROUND;
1059   }
1060   $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1061   readfrompipes ();
1062   if (!reapchildren())
1063   {
1064     check_refresh_wanted();
1065     check_squeue();
1066     update_progress_stats();
1067     select (undef, undef, undef, 0.1);
1068     killem (keys %proc) if $main::please_freeze;
1069   }
1070 }
1071
1072 update_progress_stats();
1073 freeze_if_want_freeze();
1074
1075
1076 if (!defined $main::success)
1077 {
1078   if (!@jobstep_todo) {
1079     $main::success = 1;
1080   } elsif ($working_slot_count < 1) {
1081     save_output_collection();
1082     save_meta();
1083     exit(EX_RETRY_UNLOCKED);
1084   } elsif ($thisround_succeeded == 0 &&
1085            ($thisround_failed == 0 || $thisround_failed > 4)) {
1086     my $message = "stop because $thisround_failed tasks failed and none succeeded";
1087     Log (undef, $message);
1088     $main::success = 0;
1089   }
1090 }
1091
1092 goto ONELEVEL if !defined $main::success;
1093
1094
1095 release_allocation();
1096 freeze();
1097 my $collated_output = save_output_collection();
1098 Log (undef, "finish");
1099
1100 save_meta();
1101
1102 my $final_state;
1103 if ($collated_output && $main::success) {
1104   $final_state = 'Complete';
1105 } else {
1106   $final_state = 'Failed';
1107 }
1108 $Job->update_attributes('state' => $final_state);
1109
1110 exit (($final_state eq 'Complete') ? 0 : 1);
1111
1112
1113
1114 sub update_progress_stats
1115 {
1116   $progress_stats_updated = time;
1117   return if !$progress_is_dirty;
1118   my ($todo, $done, $running) = (scalar @jobstep_todo,
1119                                  scalar @jobstep_done,
1120                                  scalar keys(%proc));
1121   $Job->{'tasks_summary'} ||= {};
1122   $Job->{'tasks_summary'}->{'todo'} = $todo;
1123   $Job->{'tasks_summary'}->{'done'} = $done;
1124   $Job->{'tasks_summary'}->{'running'} = $running;
1125   $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1126   Log (undef, "status: $done done, $running running, $todo todo");
1127   $progress_is_dirty = 0;
1128 }
1129
1130
1131
1132 sub reapchildren
1133 {
1134   my $children_reaped = 0;
1135   while ((my $pid = waitpid (-1, WNOHANG)) > 0)
1136   {
1137     my $childstatus = $?;
1138
1139     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1140                     . "."
1141                     . $slot[$proc{$pid}->{slot}]->{cpu});
1142     my $jobstepidx = $proc{$pid}->{jobstepidx};
1143
1144     if (!WIFEXITED($childstatus))
1145     {
1146       # child did not exit (may be temporarily stopped)
1147       Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
1148       next;
1149     }
1150
1151     $children_reaped++;
1152     my $elapsed = time - $proc{$pid}->{time};
1153     my $Jobstep = $jobstep[$jobstepidx];
1154
1155     my $exitvalue = $childstatus >> 8;
1156     my $exitinfo = "exit ".exit_status_s($childstatus);
1157     $Jobstep->{'arvados_task'}->reload;
1158     my $task_success = $Jobstep->{'arvados_task'}->{success};
1159
1160     Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1161
1162     if (!defined $task_success) {
1163       # task did not indicate one way or the other --> fail
1164       Log($jobstepidx, sprintf(
1165             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1166             exit_status_s($childstatus)));
1167       $Jobstep->{'arvados_task'}->{success} = 0;
1168       $Jobstep->{'arvados_task'}->save;
1169       $task_success = 0;
1170     }
1171
1172     if (!$task_success)
1173     {
1174       my $temporary_fail;
1175       $temporary_fail ||= $Jobstep->{tempfail};
1176       $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1177
1178       ++$thisround_failed;
1179       ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1180
1181       # Check for signs of a failed or misconfigured node
1182       if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1183           2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1184         # Don't count this against jobstep failure thresholds if this
1185         # node is already suspected faulty and srun exited quickly
1186         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1187             $elapsed < 5) {
1188           Log ($jobstepidx, "blaming failure on suspect node " .
1189                $slot[$proc{$pid}->{slot}]->{node}->{name});
1190           $temporary_fail ||= 1;
1191         }
1192         ban_node_by_slot($proc{$pid}->{slot});
1193       }
1194
1195       Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1196                                 ++$Jobstep->{'failures'},
1197                                 $temporary_fail ? 'temporary' : 'permanent',
1198                                 $elapsed));
1199
1200       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1201         # Give up on this task, and the whole job
1202         $main::success = 0;
1203       }
1204       # Put this task back on the todo queue
1205       push @jobstep_todo, $jobstepidx;
1206       $Job->{'tasks_summary'}->{'failed'}++;
1207     }
1208     else
1209     {
1210       ++$thisround_succeeded;
1211       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1212       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1213       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1214       push @jobstep_done, $jobstepidx;
1215       Log ($jobstepidx, "success in $elapsed seconds");
1216     }
1217     $Jobstep->{exitcode} = $childstatus;
1218     $Jobstep->{finishtime} = time;
1219     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1220     $Jobstep->{'arvados_task'}->save;
1221     process_stderr_final ($jobstepidx);
1222     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1223                               length($Jobstep->{'arvados_task'}->{output}),
1224                               $Jobstep->{'arvados_task'}->{output}));
1225
1226     close $reader{$jobstepidx};
1227     delete $reader{$jobstepidx};
1228     delete $slot[$proc{$pid}->{slot}]->{pid};
1229     push @freeslot, $proc{$pid}->{slot};
1230     delete $proc{$pid};
1231
1232     if ($task_success) {
1233       # Load new tasks
1234       my $newtask_list = [];
1235       my $newtask_results;
1236       do {
1237         $newtask_results = api_call(
1238           "job_tasks/list",
1239           'where' => {
1240             'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1241           },
1242           'order' => 'qsequence',
1243           'offset' => scalar(@$newtask_list),
1244             );
1245         push(@$newtask_list, @{$newtask_results->{items}});
1246       } while (@{$newtask_results->{items}});
1247       foreach my $arvados_task (@$newtask_list) {
1248         my $jobstep = {
1249           'level' => $arvados_task->{'sequence'},
1250           'failures' => 0,
1251           'arvados_task' => $arvados_task
1252         };
1253         push @jobstep, $jobstep;
1254         push @jobstep_todo, $#jobstep;
1255       }
1256     }
1257     $progress_is_dirty = 1;
1258   }
1259
1260   return $children_reaped;
1261 }
1262
1263 sub check_refresh_wanted
1264 {
1265   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1266   if (@stat &&
1267       $stat[9] > $latest_refresh &&
1268       # ...and we have actually locked the job record...
1269       $job_id eq $Job->{'uuid'}) {
1270     $latest_refresh = scalar time;
1271     my $Job2 = api_call("jobs/get", uuid => $jobspec);
1272     for my $attr ('cancelled_at',
1273                   'cancelled_by_user_uuid',
1274                   'cancelled_by_client_uuid',
1275                   'state') {
1276       $Job->{$attr} = $Job2->{$attr};
1277     }
1278     if ($Job->{'state'} ne "Running") {
1279       if ($Job->{'state'} eq "Cancelled") {
1280         Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1281       } else {
1282         Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1283       }
1284       $main::success = 0;
1285       $main::please_freeze = 1;
1286     }
1287   }
1288 }
1289
1290 sub check_squeue
1291 {
1292   my $last_squeue_check = $squeue_checked;
1293
1294   # Do not call `squeue` or check the kill list more than once every
1295   # 15 seconds.
1296   return if $last_squeue_check > time - 15;
1297   $squeue_checked = time;
1298
1299   # Look for children from which we haven't received stderr data since
1300   # the last squeue check. If no such children exist, all procs are
1301   # alive and there's no need to even look at squeue.
1302   #
1303   # As long as the crunchstat poll interval (10s) is shorter than the
1304   # squeue check interval (15s) this should make the squeue check an
1305   # infrequent event.
1306   my $silent_procs = 0;
1307   for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1308   {
1309     if (!exists($js->{stderr_at}))
1310     {
1311       $js->{stderr_at} = 0;
1312     }
1313     if ($js->{stderr_at} < $last_squeue_check)
1314     {
1315       $silent_procs++;
1316     }
1317   }
1318   return if $silent_procs == 0;
1319
1320   # use killem() on procs whose killtime is reached
1321   while (my ($pid, $procinfo) = each %proc)
1322   {
1323     my $js = $jobstep[$procinfo->{jobstepidx}];
1324     if (exists $procinfo->{killtime}
1325         && $procinfo->{killtime} <= time
1326         && $js->{stderr_at} < $last_squeue_check)
1327     {
1328       my $sincewhen = "";
1329       if ($js->{stderr_at}) {
1330         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1331       }
1332       Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1333       killem ($pid);
1334     }
1335   }
1336
1337   if (!$have_slurm)
1338   {
1339     # here is an opportunity to check for mysterious problems with local procs
1340     return;
1341   }
1342
1343   # Get a list of steps still running.  Note: squeue(1) says --steps
1344   # selects a format (which we override anyway) and allows us to
1345   # specify which steps we're interested in (which we don't).
1346   # Importantly, it also changes the meaning of %j from "job name" to
1347   # "step name" and (although this isn't mentioned explicitly in the
1348   # docs) switches from "one line per job" mode to "one line per step"
1349   # mode. Without it, we'd just get a list of one job, instead of a
1350   # list of N steps.
1351   my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1352   if ($? != 0)
1353   {
1354     Log(undef, "warning: squeue exit status $? ($!)");
1355     return;
1356   }
1357   chop @squeue;
1358
1359   # which of my jobsteps are running, according to squeue?
1360   my %ok;
1361   for my $jobstepname (@squeue)
1362   {
1363     $ok{$jobstepname} = 1;
1364   }
1365
1366   # Check for child procs >60s old and not mentioned by squeue.
1367   while (my ($pid, $procinfo) = each %proc)
1368   {
1369     if ($procinfo->{time} < time - 60
1370         && $procinfo->{jobstepname}
1371         && !exists $ok{$procinfo->{jobstepname}}
1372         && !exists $procinfo->{killtime})
1373     {
1374       # According to slurm, this task has ended (successfully or not)
1375       # -- but our srun child hasn't exited. First we must wait (30
1376       # seconds) in case this is just a race between communication
1377       # channels. Then, if our srun child process still hasn't
1378       # terminated, we'll conclude some slurm communication
1379       # error/delay has caused the task to die without notifying srun,
1380       # and we'll kill srun ourselves.
1381       $procinfo->{killtime} = time + 30;
1382       Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1383     }
1384   }
1385 }
1386
1387
1388 sub release_allocation
1389 {
1390   if ($have_slurm)
1391   {
1392     Log (undef, "release job allocation");
1393     system "scancel $ENV{SLURM_JOB_ID}";
1394   }
1395 }
1396
1397
1398 sub readfrompipes
1399 {
1400   my $gotsome = 0;
1401   my %fd_job;
1402   my $sel = IO::Select->new();
1403   foreach my $jobstepidx (keys %reader)
1404   {
1405     my $fd = $reader{$jobstepidx};
1406     $sel->add($fd);
1407     $fd_job{$fd} = $jobstepidx;
1408
1409     if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1410       $sel->add($stdout_fd);
1411       $fd_job{$stdout_fd} = $jobstepidx;
1412     }
1413   }
1414   # select on all reader fds with 0.1s timeout
1415   my @ready_fds = $sel->can_read(0.1);
1416   foreach my $fd (@ready_fds)
1417   {
1418     my $buf;
1419     if (0 < sysread ($fd, $buf, 65536))
1420     {
1421       $gotsome = 1;
1422       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1423
1424       my $jobstepidx = $fd_job{$fd};
1425       if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1426         $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1427         next;
1428       }
1429
1430       $jobstep[$jobstepidx]->{stderr_at} = time;
1431       $jobstep[$jobstepidx]->{stderr} .= $buf;
1432
1433       # Consume everything up to the last \n
1434       preprocess_stderr ($jobstepidx);
1435
1436       if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1437       {
1438         # If we get a lot of stderr without a newline, chop off the
1439         # front to avoid letting our buffer grow indefinitely.
1440         substr ($jobstep[$jobstepidx]->{stderr},
1441                 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1442       }
1443     }
1444   }
1445   return $gotsome;
1446 }
1447
1448
1449 # Consume all full lines of stderr for a jobstep. Everything after the
1450 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1451 # returning.
1452 sub preprocess_stderr
1453 {
1454   my $jobstepidx = shift;
1455
1456   while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1457     my $line = $1;
1458     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1459     Log ($jobstepidx, "stderr $line");
1460     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1461       # whoa.
1462       $main::please_freeze = 1;
1463     }
1464     elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
1465       # Skip the following tempfail checks if this srun proc isn't
1466       # attached to a particular worker slot.
1467     }
1468     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
1469       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1470       $slot[$job_slot_index]->{node}->{fail_count}++;
1471       $jobstep[$jobstepidx]->{tempfail} = 1;
1472       ban_node_by_slot($job_slot_index);
1473     }
1474     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1475       $jobstep[$jobstepidx]->{tempfail} = 1;
1476       ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
1477     }
1478     elsif ($line =~ /arvados\.errors\.Keep/) {
1479       $jobstep[$jobstepidx]->{tempfail} = 1;
1480     }
1481   }
1482 }
1483
1484
1485 sub process_stderr_final
1486 {
1487   my $jobstepidx = shift;
1488   preprocess_stderr ($jobstepidx);
1489
1490   map {
1491     Log ($jobstepidx, "stderr $_");
1492   } split ("\n", $jobstep[$jobstepidx]->{stderr});
1493   $jobstep[$jobstepidx]->{stderr} = '';
1494 }
1495
1496 sub fetch_block
1497 {
1498   my $hash = shift;
1499   my $keep;
1500   if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1501     Log(undef, "fetch_block run error from arv-get $hash: $!");
1502     return undef;
1503   }
1504   my $output_block = "";
1505   while (1) {
1506     my $buf;
1507     my $bytes = sysread($keep, $buf, 1024 * 1024);
1508     if (!defined $bytes) {
1509       Log(undef, "fetch_block read error from arv-get: $!");
1510       $output_block = undef;
1511       last;
1512     } elsif ($bytes == 0) {
1513       # sysread returns 0 at the end of the pipe.
1514       last;
1515     } else {
1516       # some bytes were read into buf.
1517       $output_block .= $buf;
1518     }
1519   }
1520   close $keep;
1521   if ($?) {
1522     Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1523     $output_block = undef;
1524   }
1525   return $output_block;
1526 }
1527
1528 # Create a collection by concatenating the output of all tasks (each
1529 # task's output is either a manifest fragment, a locator for a
1530 # manifest fragment stored in Keep, or nothing at all). Return the
1531 # portable_data_hash of the new collection.
1532 sub create_output_collection
1533 {
1534   Log (undef, "collate");
1535
1536   my ($child_out, $child_in);
1537   my $pid = open2($child_out, $child_in, 'python', '-c', q{
1538 import arvados
1539 import sys
1540 print (arvados.api("v1").collections().
1541        create(body={"manifest_text": sys.stdin.read()}).
1542        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1543 }, retry_count());
1544
1545   my $task_idx = -1;
1546   my $manifest_size = 0;
1547   for (@jobstep)
1548   {
1549     ++$task_idx;
1550     my $output = $_->{'arvados_task'}->{output};
1551     next if (!defined($output));
1552     my $next_write;
1553     if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1554       $next_write = fetch_block($output);
1555     } else {
1556       $next_write = $output;
1557     }
1558     if (defined($next_write)) {
1559       if (!defined(syswrite($child_in, $next_write))) {
1560         # There's been an error writing.  Stop the loop.
1561         # We'll log details about the exit code later.
1562         last;
1563       } else {
1564         $manifest_size += length($next_write);
1565       }
1566     } else {
1567       my $uuid = $_->{'arvados_task'}->{'uuid'};
1568       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1569       $main::success = 0;
1570     }
1571   }
1572   close($child_in);
1573   Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1574
1575   my $joboutput;
1576   my $s = IO::Select->new($child_out);
1577   if ($s->can_read(120)) {
1578     sysread($child_out, $joboutput, 1024 * 1024);
1579     waitpid($pid, 0);
1580     if ($?) {
1581       Log(undef, "output collection creation exited " . exit_status_s($?));
1582       $joboutput = undef;
1583     } else {
1584       chomp($joboutput);
1585     }
1586   } else {
1587     Log (undef, "timed out while creating output collection");
1588     foreach my $signal (2, 2, 2, 15, 15, 9) {
1589       kill($signal, $pid);
1590       last if waitpid($pid, WNOHANG) == -1;
1591       sleep(1);
1592     }
1593   }
1594   close($child_out);
1595
1596   return $joboutput;
1597 }
1598
1599 # Calls create_output_collection, logs the result, and returns it.
1600 # If that was successful, save that as the output in the job record.
1601 sub save_output_collection {
1602   my $collated_output = create_output_collection();
1603
1604   if (!$collated_output) {
1605     Log(undef, "Failed to write output collection");
1606   }
1607   else {
1608     Log(undef, "job output $collated_output");
1609     $Job->update_attributes('output' => $collated_output);
1610   }
1611   return $collated_output;
1612 }
1613
1614 sub killem
1615 {
1616   foreach (@_)
1617   {
1618     my $sig = 2;                # SIGINT first
1619     if (exists $proc{$_}->{"sent_$sig"} &&
1620         time - $proc{$_}->{"sent_$sig"} > 4)
1621     {
1622       $sig = 15;                # SIGTERM if SIGINT doesn't work
1623     }
1624     if (exists $proc{$_}->{"sent_$sig"} &&
1625         time - $proc{$_}->{"sent_$sig"} > 4)
1626     {
1627       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1628     }
1629     if (!exists $proc{$_}->{"sent_$sig"})
1630     {
1631       Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1632       kill $sig, $_;
1633       select (undef, undef, undef, 0.1);
1634       if ($sig == 2)
1635       {
1636         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1637       }
1638       $proc{$_}->{"sent_$sig"} = time;
1639       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1640     }
1641   }
1642 }
1643
1644
1645 sub fhbits
1646 {
1647   my($bits);
1648   for (@_) {
1649     vec($bits,fileno($_),1) = 1;
1650   }
1651   $bits;
1652 }
1653
1654
1655 # Send log output to Keep via arv-put.
1656 #
1657 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1658 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1659 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1660 # $log_pipe_pid is the pid of the arv-put subprocess.
1661 #
1662 # The only functions that should access these variables directly are:
1663 #
1664 # log_writer_start($logfilename)
1665 #     Starts an arv-put pipe, reading data on stdin and writing it to
1666 #     a $logfilename file in an output collection.
1667 #
1668 # log_writer_read_output([$timeout])
1669 #     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1670 #     Passes $timeout to the select() call, with a default of 0.01.
1671 #     Returns the result of the last read() call on $log_pipe_out, or
1672 #     -1 if read() wasn't called because select() timed out.
1673 #     Only other log_writer_* functions should need to call this.
1674 #
1675 # log_writer_send($txt)
1676 #     Writes $txt to the output log collection.
1677 #
1678 # log_writer_finish()
1679 #     Closes the arv-put pipe and returns the output that it produces.
1680 #
1681 # log_writer_is_active()
1682 #     Returns a true value if there is currently a live arv-put
1683 #     process, false otherwise.
1684 #
1685 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1686     $log_pipe_pid);
1687
1688 sub log_writer_start($)
1689 {
1690   my $logfilename = shift;
1691   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1692                         'arv-put',
1693                         '--stream',
1694                         '--retries', '3',
1695                         '--filename', $logfilename,
1696                         '-');
1697   $log_pipe_out_buf = "";
1698   $log_pipe_out_select = IO::Select->new($log_pipe_out);
1699 }
1700
1701 sub log_writer_read_output {
1702   my $timeout = shift || 0.01;
1703   my $read = -1;
1704   while ($read && $log_pipe_out_select->can_read($timeout)) {
1705     $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1706                  length($log_pipe_out_buf));
1707   }
1708   if (!defined($read)) {
1709     Log(undef, "error reading log manifest from arv-put: $!");
1710   }
1711   return $read;
1712 }
1713
1714 sub log_writer_send($)
1715 {
1716   my $txt = shift;
1717   print $log_pipe_in $txt;
1718   log_writer_read_output();
1719 }
1720
1721 sub log_writer_finish()
1722 {
1723   return unless $log_pipe_pid;
1724
1725   close($log_pipe_in);
1726
1727   my $logger_failed = 0;
1728   my $read_result = log_writer_read_output(120);
1729   if ($read_result == -1) {
1730     $logger_failed = -1;
1731     Log (undef, "timed out reading from 'arv-put'");
1732   } elsif ($read_result != 0) {
1733     $logger_failed = -2;
1734     Log(undef, "failed to read arv-put log manifest to EOF");
1735   }
1736
1737   waitpid($log_pipe_pid, 0);
1738   if ($?) {
1739     $logger_failed ||= $?;
1740     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1741   }
1742
1743   close($log_pipe_out);
1744   my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1745   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1746       $log_pipe_out_select = undef;
1747
1748   return $arv_put_output;
1749 }
1750
1751 sub log_writer_is_active() {
1752   return $log_pipe_pid;
1753 }
1754
1755 sub Log                         # ($jobstepidx, $logmessage)
1756 {
1757   my ($jobstepidx, $logmessage) = @_;
1758   if ($logmessage =~ /\n/) {
1759     for my $line (split (/\n/, $_[1])) {
1760       Log ($jobstepidx, $line);
1761     }
1762     return;
1763   }
1764   my $fh = select STDERR; $|=1; select $fh;
1765   my $task_qseq = '';
1766   if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1767     $task_qseq = $jobstepidx;
1768   }
1769   my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1770   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1771   $message .= "\n";
1772   my $datetime;
1773   if (log_writer_is_active() || -t STDERR) {
1774     my @gmtime = gmtime;
1775     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1776                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1777   }
1778   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1779
1780   if (log_writer_is_active()) {
1781     log_writer_send($datetime . " " . $message);
1782   }
1783 }
1784
1785
1786 sub croak
1787 {
1788   my ($package, $file, $line) = caller;
1789   my $message = "@_ at $file line $line\n";
1790   Log (undef, $message);
1791   freeze() if @jobstep_todo;
1792   create_output_collection() if @jobstep_todo;
1793   cleanup();
1794   save_meta();
1795   die;
1796 }
1797
1798
1799 sub cleanup
1800 {
1801   return unless $Job;
1802   if ($Job->{'state'} eq 'Cancelled') {
1803     $Job->update_attributes('finished_at' => scalar gmtime);
1804   } else {
1805     $Job->update_attributes('state' => 'Failed');
1806   }
1807 }
1808
1809
1810 sub save_meta
1811 {
1812   my $justcheckpoint = shift; # false if this will be the last meta saved
1813   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1814   return unless log_writer_is_active();
1815   my $log_manifest = log_writer_finish();
1816   return unless defined($log_manifest);
1817
1818   if ($Job->{log}) {
1819     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1820     $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1821   }
1822
1823   my $log_coll = api_call(
1824     "collections/create", ensure_unique_name => 1, collection => {
1825       manifest_text => $log_manifest,
1826       owner_uuid => $Job->{owner_uuid},
1827       name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1828     });
1829   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1830   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1831 }
1832
1833
1834 sub freeze_if_want_freeze
1835 {
1836   if ($main::please_freeze)
1837   {
1838     release_allocation();
1839     if (@_)
1840     {
1841       # kill some srun procs before freeze+stop
1842       map { $proc{$_} = {} } @_;
1843       while (%proc)
1844       {
1845         killem (keys %proc);
1846         select (undef, undef, undef, 0.1);
1847         my $died;
1848         while (($died = waitpid (-1, WNOHANG)) > 0)
1849         {
1850           delete $proc{$died};
1851         }
1852       }
1853     }
1854     freeze();
1855     create_output_collection();
1856     cleanup();
1857     save_meta();
1858     exit 1;
1859   }
1860 }
1861
1862
1863 sub freeze
1864 {
1865   Log (undef, "Freeze not implemented");
1866   return;
1867 }
1868
1869
1870 sub thaw
1871 {
1872   croak ("Thaw not implemented");
1873 }
1874
1875
1876 sub freezequote
1877 {
1878   my $s = shift;
1879   $s =~ s/\\/\\\\/g;
1880   $s =~ s/\n/\\n/g;
1881   return $s;
1882 }
1883
1884
1885 sub freezeunquote
1886 {
1887   my $s = shift;
1888   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1889   return $s;
1890 }
1891
1892
1893 sub srun_sync
1894 {
1895   my $srunargs = shift;
1896   my $execargs = shift;
1897   my $opts = shift || {};
1898   my $stdin = shift;
1899
1900   my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1901   Log (undef, "$label: start");
1902
1903   my ($stderr_r, $stderr_w);
1904   pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1905
1906   my ($stdout_r, $stdout_w);
1907   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1908
1909   my $srunpid = fork();
1910   if ($srunpid == 0)
1911   {
1912     close($stderr_r);
1913     close($stdout_r);
1914     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1915     fcntl($stdout_w, F_SETFL, 0) or croak($!);
1916     open(STDERR, ">&", $stderr_w);
1917     open(STDOUT, ">&", $stdout_w);
1918     srun ($srunargs, $execargs, $opts, $stdin);
1919     exit (1);
1920   }
1921   close($stderr_w);
1922   close($stdout_w);
1923
1924   set_nonblocking($stderr_r);
1925   set_nonblocking($stdout_r);
1926
1927   # Add entries to @jobstep and %proc so check_squeue() and
1928   # freeze_if_want_freeze() can treat it like a job task process.
1929   push @jobstep, {
1930     stderr => '',
1931     stderr_at => 0,
1932     stderr_captured => '',
1933     stdout_r => $stdout_r,
1934     stdout_captured => '',
1935   };
1936   my $jobstepidx = $#jobstep;
1937   $proc{$srunpid} = {
1938     jobstepidx => $jobstepidx,
1939   };
1940   $reader{$jobstepidx} = $stderr_r;
1941
1942   while ($srunpid != waitpid ($srunpid, WNOHANG)) {
1943     my $busy = readfrompipes();
1944     if (!$busy || ($latest_refresh + 2 < scalar time)) {
1945       check_refresh_wanted();
1946       check_squeue();
1947     }
1948     if (!$busy) {
1949       select(undef, undef, undef, 0.1);
1950     }
1951     killem(keys %proc) if $main::please_freeze;
1952   }
1953   my $exited = $?;
1954
1955   1 while readfrompipes();
1956   process_stderr_final ($jobstepidx);
1957
1958   Log (undef, "$label: exit ".exit_status_s($exited));
1959
1960   close($stdout_r);
1961   close($stderr_r);
1962   delete $proc{$srunpid};
1963   delete $reader{$jobstepidx};
1964
1965   my $j = pop @jobstep;
1966   return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
1967 }
1968
1969
1970 sub srun
1971 {
1972   my $srunargs = shift;
1973   my $execargs = shift;
1974   my $opts = shift || {};
1975   my $stdin = shift;
1976   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1977
1978   $Data::Dumper::Terse = 1;
1979   $Data::Dumper::Indent = 0;
1980   my $show_cmd = Dumper($args);
1981   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1982   $show_cmd =~ s/\n/ /g;
1983   if ($opts->{fork}) {
1984     Log(undef, "starting: $show_cmd");
1985   } else {
1986     # This is a child process: parent is in charge of reading our
1987     # stderr and copying it to Log() if needed.
1988     warn "starting: $show_cmd\n";
1989   }
1990
1991   if (defined $stdin) {
1992     my $child = open STDIN, "-|";
1993     defined $child or die "no fork: $!";
1994     if ($child == 0) {
1995       print $stdin or die $!;
1996       close STDOUT or die $!;
1997       exit 0;
1998     }
1999   }
2000
2001   return system (@$args) if $opts->{fork};
2002
2003   exec @$args;
2004   warn "ENV size is ".length(join(" ",%ENV));
2005   die "exec failed: $!: @$args";
2006 }
2007
2008
2009 sub ban_node_by_slot {
2010   # Don't start any new jobsteps on this node for 60 seconds
2011   my $slotid = shift;
2012   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2013   $slot[$slotid]->{node}->{hold_count}++;
2014   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2015 }
2016
2017 sub must_lock_now
2018 {
2019   my ($lockfile, $error_message) = @_;
2020   open L, ">", $lockfile or croak("$lockfile: $!");
2021   if (!flock L, LOCK_EX|LOCK_NB) {
2022     croak("Can't lock $lockfile: $error_message\n");
2023   }
2024 }
2025
2026 sub find_docker_image {
2027   # Given a Keep locator, check to see if it contains a Docker image.
2028   # If so, return its stream name and Docker hash.
2029   # If not, return undef for both values.
2030   my $locator = shift;
2031   my ($streamname, $filename);
2032   my $image = api_call("collections/get", uuid => $locator);
2033   if ($image) {
2034     foreach my $line (split(/\n/, $image->{manifest_text})) {
2035       my @tokens = split(/\s+/, $line);
2036       next if (!@tokens);
2037       $streamname = shift(@tokens);
2038       foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2039         if (defined($filename)) {
2040           return (undef, undef);  # More than one file in the Collection.
2041         } else {
2042           $filename = (split(/:/, $filedata, 3))[2];
2043         }
2044       }
2045     }
2046   }
2047   if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
2048     return ($streamname, $1);
2049   } else {
2050     return (undef, undef);
2051   }
2052 }
2053
2054 sub retry_count {
2055   # Calculate the number of times an operation should be retried,
2056   # assuming exponential backoff, and that we're willing to retry as
2057   # long as tasks have been running.  Enforce a minimum of 3 retries.
2058   my ($starttime, $endtime, $timediff, $retries);
2059   if (@jobstep) {
2060     $starttime = $jobstep[0]->{starttime};
2061     $endtime = $jobstep[-1]->{finishtime};
2062   }
2063   if (!defined($starttime)) {
2064     $timediff = 0;
2065   } elsif (!defined($endtime)) {
2066     $timediff = time - $starttime;
2067   } else {
2068     $timediff = ($endtime - $starttime) - (time - $endtime);
2069   }
2070   if ($timediff > 0) {
2071     $retries = int(log($timediff) / log(2));
2072   } else {
2073     $retries = 1;  # Use the minimum.
2074   }
2075   return ($retries > 3) ? $retries : 3;
2076 }
2077
2078 sub retry_op {
2079   # Pass in two function references.
2080   # This method will be called with the remaining arguments.
2081   # If it dies, retry it with exponential backoff until it succeeds,
2082   # or until the current retry_count is exhausted.  After each failure
2083   # that can be retried, the second function will be called with
2084   # the current try count (0-based), next try time, and error message.
2085   my $operation = shift;
2086   my $retry_callback = shift;
2087   my $retries = retry_count();
2088   foreach my $try_count (0..$retries) {
2089     my $next_try = time + (2 ** $try_count);
2090     my $result = eval { $operation->(@_); };
2091     if (!$@) {
2092       return $result;
2093     } elsif ($try_count < $retries) {
2094       $retry_callback->($try_count, $next_try, $@);
2095       my $sleep_time = $next_try - time;
2096       sleep($sleep_time) if ($sleep_time > 0);
2097     }
2098   }
2099   # Ensure the error message ends in a newline, so Perl doesn't add
2100   # retry_op's line number to it.
2101   chomp($@);
2102   die($@ . "\n");
2103 }
2104
2105 sub api_call {
2106   # Pass in a /-separated API method name, and arguments for it.
2107   # This function will call that method, retrying as needed until
2108   # the current retry_count is exhausted, with a log on the first failure.
2109   my $method_name = shift;
2110   my $log_api_retry = sub {
2111     my ($try_count, $next_try_at, $errmsg) = @_;
2112     $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2113     $errmsg =~ s/\s/ /g;
2114     $errmsg =~ s/\s+$//;
2115     my $retry_msg;
2116     if ($next_try_at < time) {
2117       $retry_msg = "Retrying.";
2118     } else {
2119       my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2120       $retry_msg = "Retrying at $next_try_fmt.";
2121     }
2122     Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2123   };
2124   my $method = $arv;
2125   foreach my $key (split(/\//, $method_name)) {
2126     $method = $method->{$key};
2127   }
2128   return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2129 }
2130
2131 sub exit_status_s {
2132   # Given a $?, return a human-readable exit code string like "0" or
2133   # "1" or "0 with signal 1" or "1 with signal 11".
2134   my $exitcode = shift;
2135   my $s = $exitcode >> 8;
2136   if ($exitcode & 0x7f) {
2137     $s .= " with signal " . ($exitcode & 0x7f);
2138   }
2139   if ($exitcode & 0x80) {
2140     $s .= " with core dump";
2141   }
2142   return $s;
2143 }
2144
2145 sub handle_readall {
2146   # Pass in a glob reference to a file handle.
2147   # Read all its contents and return them as a string.
2148   my $fh_glob_ref = shift;
2149   local $/ = undef;
2150   return <$fh_glob_ref>;
2151 }
2152
2153 sub tar_filename_n {
2154   my $n = shift;
2155   return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2156 }
2157
2158 sub add_git_archive {
2159   # Pass in a git archive command as a string or list, a la system().
2160   # This method will save its output to be included in the archive sent to the
2161   # build script.
2162   my $git_input;
2163   $git_tar_count++;
2164   if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2165     croak("Failed to save git archive: $!");
2166   }
2167   my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2168   close($git_input);
2169   waitpid($git_pid, 0);
2170   close(GIT_ARCHIVE);
2171   if ($?) {
2172     croak("Failed to save git archive: git exited " . exit_status_s($?));
2173   }
2174 }
2175
2176 sub combined_git_archive {
2177   # Combine all saved tar archives into a single archive, then return its
2178   # contents in a string.  Return undef if no archives have been saved.
2179   if ($git_tar_count < 1) {
2180     return undef;
2181   }
2182   my $base_tar_name = tar_filename_n(1);
2183   foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2184     my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2185     if ($tar_exit != 0) {
2186       croak("Error preparing build archive: tar -A exited " .
2187             exit_status_s($tar_exit));
2188     }
2189   }
2190   if (!open(GIT_TAR, "<", $base_tar_name)) {
2191     croak("Could not open build archive: $!");
2192   }
2193   my $tar_contents = handle_readall(\*GIT_TAR);
2194   close(GIT_TAR);
2195   return $tar_contents;
2196 }
2197
2198 sub set_nonblocking {
2199   my $fh = shift;
2200   my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2201   fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2202 }
2203
2204 __DATA__
2205 #!/usr/bin/env perl
2206 #
2207 # This is crunch-job's internal dispatch script.  crunch-job running on the API
2208 # server invokes this script on individual compute nodes, or localhost if we're
2209 # running a job locally.  It gets called in two modes:
2210 #
2211 # * No arguments: Installation mode.  Read a tar archive from the DATA
2212 #   file handle; it includes the Crunch script's source code, and
2213 #   maybe SDKs as well.  Those should be installed in the proper
2214 #   locations.  This runs outside of any Docker container, so don't try to
2215 #   introspect Crunch's runtime environment.
2216 #
2217 # * With arguments: Crunch script run mode.  This script should set up the
2218 #   environment, then run the command specified in the arguments.  This runs
2219 #   inside any Docker container.
2220
2221 use Fcntl ':flock';
2222 use File::Path qw( make_path remove_tree );
2223 use POSIX qw(getcwd);
2224
2225 use constant TASK_TEMPFAIL => 111;
2226
2227 # Map SDK subdirectories to the path environments they belong to.
2228 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2229
2230 my $destdir = $ENV{"CRUNCH_SRC"};
2231 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2232 my $repo = $ENV{"CRUNCH_SRC_URL"};
2233 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2234 my $job_work = $ENV{"JOB_WORK"};
2235 my $task_work = $ENV{"TASK_WORK"};
2236
2237 open(STDOUT_ORIG, ">&", STDOUT);
2238 open(STDERR_ORIG, ">&", STDERR);
2239
2240 for my $dir ($destdir, $job_work, $task_work) {
2241   if ($dir) {
2242     make_path $dir;
2243     -e $dir or die "Failed to create temporary directory ($dir): $!";
2244   }
2245 }
2246
2247 if ($task_work) {
2248   remove_tree($task_work, {keep_root => 1});
2249 }
2250
2251 ### Crunch script run mode
2252 if (@ARGV) {
2253   # We want to do routine logging during task 0 only.  This gives the user
2254   # the information they need, but avoids repeating the information for every
2255   # task.
2256   my $Log;
2257   if ($ENV{TASK_SEQUENCE} eq "0") {
2258     $Log = sub {
2259       my $msg = shift;
2260       printf STDERR_ORIG "[Crunch] $msg\n", @_;
2261     };
2262   } else {
2263     $Log = sub { };
2264   }
2265
2266   my $python_src = "$install_dir/python";
2267   my $venv_dir = "$job_work/.arvados.venv";
2268   my $venv_built = -e "$venv_dir/bin/activate";
2269   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2270     shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2271                  "--python=python2.7", $venv_dir);
2272     shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2273     $venv_built = 1;
2274     $Log->("Built Python SDK virtualenv");
2275   }
2276
2277   my @pysdk_version_cmd = ("python", "-c",
2278     "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2279   if ($venv_built) {
2280     $Log->("Running in Python SDK virtualenv");
2281     @pysdk_version_cmd = ();
2282     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2283     @ARGV = ("/bin/sh", "-ec",
2284              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2285   } elsif (-d $python_src) {
2286     $Log->("Warning: virtualenv not found inside Docker container default " .
2287            "\$PATH. Can't install Python SDK.");
2288   }
2289
2290   if (@pysdk_version_cmd) {
2291     open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2292     my $pysdk_version = <$pysdk_version_pipe>;
2293     close($pysdk_version_pipe);
2294     if ($? == 0) {
2295       chomp($pysdk_version);
2296       $Log->("Using Arvados SDK version $pysdk_version");
2297     } else {
2298       # A lot could've gone wrong here, but pretty much all of it means that
2299       # Python won't be able to load the Arvados SDK.
2300       $Log->("Warning: Arvados SDK not found");
2301     }
2302   }
2303
2304   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2305     my $sdk_path = "$install_dir/$sdk_dir";
2306     if (-d $sdk_path) {
2307       if ($ENV{$sdk_envkey}) {
2308         $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2309       } else {
2310         $ENV{$sdk_envkey} = $sdk_path;
2311       }
2312       $Log->("Arvados SDK added to %s", $sdk_envkey);
2313     }
2314   }
2315
2316   exec(@ARGV);
2317   die "Cannot exec `@ARGV`: $!";
2318 }
2319
2320 ### Installation mode
2321 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2322 flock L, LOCK_EX;
2323 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2324   # This exact git archive (source + arvados sdk) is already installed
2325   # here, so there's no need to reinstall it.
2326
2327   # We must consume our DATA section, though: otherwise the process
2328   # feeding it to us will get SIGPIPE.
2329   my $buf;
2330   while (read(DATA, $buf, 65536)) { }
2331
2332   exit(0);
2333 }
2334
2335 unlink "$destdir.archive_hash";
2336 mkdir $destdir;
2337
2338 do {
2339   # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2340   local $SIG{PIPE} = "IGNORE";
2341   warn "Extracting archive: $archive_hash\n";
2342   # --ignore-zeros is necessary sometimes: depending on how much NUL
2343   # padding tar -A put on our combined archive (which in turn depends
2344   # on the length of the component archives) tar without
2345   # --ignore-zeros will exit before consuming stdin and cause close()
2346   # to fail on the resulting SIGPIPE.
2347   if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2348     die "Error launching 'tar -xC $destdir': $!";
2349   }
2350   # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2351   # get SIGPIPE.  We must feed it data incrementally.
2352   my $tar_input;
2353   while (read(DATA, $tar_input, 65536)) {
2354     print TARX $tar_input;
2355   }
2356   if(!close(TARX)) {
2357     die "'tar -xC $destdir' exited $?: $!";
2358   }
2359 };
2360
2361 mkdir $install_dir;
2362
2363 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2364 if (-d $sdk_root) {
2365   foreach my $sdk_lang (("python",
2366                          map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2367     if (-d "$sdk_root/$sdk_lang") {
2368       if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2369         die "Failed to install $sdk_lang SDK: $!";
2370       }
2371     }
2372   }
2373 }
2374
2375 my $python_dir = "$install_dir/python";
2376 if ((-d $python_dir) and can_run("python2.7")) {
2377   open(my $egg_info_pipe, "-|",
2378        "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2379   my @egg_info_errors = <$egg_info_pipe>;
2380   close($egg_info_pipe);
2381
2382   if ($?) {
2383     if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2384       # egg_info apparently failed because it couldn't ask git for a build tag.
2385       # Specify no build tag.
2386       open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2387       print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2388       close($pysdk_cfg);
2389     } else {
2390       my $egg_info_exit = $? >> 8;
2391       foreach my $errline (@egg_info_errors) {
2392         warn $errline;
2393       }
2394       warn "python setup.py egg_info failed: exit $egg_info_exit";
2395       exit ($egg_info_exit || 1);
2396     }
2397   }
2398 }
2399
2400 # Hide messages from the install script (unless it fails: shell_or_die
2401 # will show $destdir.log in that case).
2402 open(STDOUT, ">>", "$destdir.log");
2403 open(STDERR, ">&", STDOUT);
2404
2405 if (-e "$destdir/crunch_scripts/install") {
2406     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2407 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2408     # Old version
2409     shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2410 } elsif (-e "./install.sh") {
2411     shell_or_die (undef, "./install.sh", $install_dir);
2412 }
2413
2414 if ($archive_hash) {
2415     unlink "$destdir.archive_hash.new";
2416     symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2417     rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2418 }
2419
2420 close L;
2421
2422 sub can_run {
2423   my $command_name = shift;
2424   open(my $which, "-|", "which", $command_name);
2425   while (<$which>) { }
2426   close($which);
2427   return ($? == 0);
2428 }
2429
2430 sub shell_or_die
2431 {
2432   my $exitcode = shift;
2433
2434   if ($ENV{"DEBUG"}) {
2435     print STDERR "@_\n";
2436   }
2437   if (system (@_) != 0) {
2438     my $err = $!;
2439     my $code = $?;
2440     my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2441     open STDERR, ">&STDERR_ORIG";
2442     system ("cat $destdir.log >&2");
2443     warn "@_ failed ($err): $exitstatus";
2444     if (defined($exitcode)) {
2445       exit $exitcode;
2446     }
2447     else {
2448       exit (($code >> 8) || 1);
2449     }
2450   }
2451 }
2452
2453 __DATA__