2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z --git-dir /path/to/repo/.git
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
20 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
28 If the job is already locked, steal the lock and run it anyway.
32 Path to a .git directory (or a git URL) where the commit given in the
33 job's C<script_version> attribute is to be found. If this is I<not>
34 given, the job's C<repository> attribute will be used.
38 Arvados API authorization token to use during the course of the job.
42 Do not clear per-job/task temporary directories during initial job
43 setup. This can speed up development and debugging when running jobs
48 UUID of the job to run, or a JSON-encoded job resource without a
49 UUID. If the latter is given, a new job object will be created.
53 =head1 RUNNING JOBS LOCALLY
55 crunch-job's log messages appear on stderr along with the job tasks'
56 stderr streams. The log is saved in Keep at each checkpoint and when
59 If the job succeeds, the job's output locator is printed on stdout.
61 While the job is running, the following signals are accepted:
65 =item control-C, SIGINT, SIGQUIT
67 Save a checkpoint, terminate any job tasks that are running, and stop.
71 Save a checkpoint and continue.
75 Refresh node allocation (i.e., check whether any nodes have been added
76 or unallocated) and attributes of the Job record that should affect
77 behavior (e.g., cancel job if cancelled_at becomes non-nil).
85 use POSIX ':sys_wait_h';
86 use POSIX qw(strftime);
87 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
91 use Digest::MD5 qw(md5_hex);
97 use File::Path qw( make_path remove_tree );
99 use constant TASK_TEMPFAIL => 111;
100 use constant EX_TEMPFAIL => 75;
101 use constant EX_RETRY_UNLOCKED => 93;
103 $ENV{"TMPDIR"} ||= "/tmp";
104 unless (defined $ENV{"CRUNCH_TMP"}) {
105 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
106 if ($ENV{"USER"} ne "crunch" && $< != 0) {
107 # use a tmp dir unique for my uid
108 $ENV{"CRUNCH_TMP"} .= "-$<";
112 # Create the tmp directory if it does not exist
113 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
114 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
117 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
118 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
119 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
120 mkdir ($ENV{"JOB_WORK"});
129 my $docker_bin = "/usr/bin/docker.io";
130 GetOptions('force-unlock' => \$force_unlock,
131 'git-dir=s' => \$git_dir,
132 'job=s' => \$jobspec,
133 'job-api-token=s' => \$job_api_token,
134 'no-clear-tmp' => \$no_clear_tmp,
135 'resume-stash=s' => \$resume_stash,
136 'docker-bin=s' => \$docker_bin,
139 if (defined $job_api_token) {
140 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
143 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
148 $main::ENV{CRUNCH_DEBUG} = 1;
152 $main::ENV{CRUNCH_DEBUG} = 0;
155 my $arv = Arvados->new('apiVersion' => 'v1');
164 if ($jobspec =~ /^[-a-z\d]+$/)
166 # $jobspec is an Arvados UUID, not a JSON job specification
167 $Job = api_call("jobs/get", uuid => $jobspec);
172 $local_job = JSON::decode_json($jobspec);
176 # Make sure our workers (our slurm nodes, localhost, or whatever) are
177 # at least able to run basic commands: they aren't down or severely
180 if (($Job || $local_job)->{docker_image_locator}) {
181 $cmd = [$docker_bin, 'ps', '-q'];
183 Log(undef, "Sanity check is `@$cmd`");
184 srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
188 Log(undef, "Sanity check failed: ".exit_status_s($?));
191 Log(undef, "Sanity check OK");
194 my $User = api_call("users/current");
197 if (!$force_unlock) {
198 # Claim this job, and make sure nobody else does
199 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
201 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
210 map { croak ("No $_ specified") unless $local_job->{$_} }
211 qw(script script_version script_parameters);
214 $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
215 $local_job->{'started_at'} = gmtime;
216 $local_job->{'state'} = 'Running';
218 $Job = api_call("jobs/create", job => $local_job);
220 $job_id = $Job->{'uuid'};
222 my $keep_logfile = $job_id . '.log.txt';
223 log_writer_start($keep_logfile);
225 $Job->{'runtime_constraints'} ||= {};
226 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
227 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
229 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
231 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
232 chomp($gem_versions);
233 chop($gem_versions); # Closing parentheses
238 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
240 Log (undef, "check slurm allocation");
243 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
247 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
248 push @sinfo, "$localcpus localhost";
250 if (exists $ENV{SLURM_NODELIST})
252 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
256 my ($ncpus, $slurm_nodelist) = split;
257 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
260 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
263 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
266 foreach (split (",", $ranges))
279 push @nodelist, map {
281 $n =~ s/\[[-,\d]+\]/$_/;
288 push @nodelist, $nodelist;
291 foreach my $nodename (@nodelist)
293 Log (undef, "node $nodename - $ncpus slots");
294 my $node = { name => $nodename,
296 # The number of consecutive times a task has been dispatched
297 # to this node and failed.
299 # The number of consecutive times that SLURM has reported
300 # a node failure since the last successful task.
302 # Don't dispatch work to this node until this time
303 # (in seconds since the epoch) has passed.
305 foreach my $cpu (1..$ncpus)
307 push @slot, { node => $node,
311 push @node, @nodelist;
316 # Ensure that we get one jobstep running on each allocated node before
317 # we start overloading nodes with concurrent steps
319 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
322 $Job->update_attributes(
323 'tasks_summary' => { 'failed' => 0,
328 Log (undef, "start");
329 $SIG{'INT'} = sub { $main::please_freeze = 1; };
330 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
331 $SIG{'TERM'} = \&croak;
332 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
333 $SIG{'ALRM'} = sub { $main::please_info = 1; };
334 $SIG{'CONT'} = sub { $main::please_continue = 1; };
335 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
337 $main::please_freeze = 0;
338 $main::please_info = 0;
339 $main::please_continue = 0;
340 $main::please_refresh = 0;
341 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
343 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
344 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
345 $ENV{"JOB_UUID"} = $job_id;
348 my @jobstep_todo = ();
349 my @jobstep_done = ();
350 my @jobstep_tomerge = ();
351 my $jobstep_tomerge_level = 0;
352 my $squeue_checked = 0;
353 my $latest_refresh = scalar time;
357 if (defined $Job->{thawedfromkey})
359 thaw ($Job->{thawedfromkey});
363 my $first_task = api_call("job_tasks/create", job_task => {
364 'job_uuid' => $Job->{'uuid'},
369 push @jobstep, { 'level' => 0,
371 'arvados_task' => $first_task,
373 push @jobstep_todo, 0;
379 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
382 my $build_script = handle_readall(\*DATA);
383 my $nodelist = join(",", @node);
384 my $git_tar_count = 0;
386 if (!defined $no_clear_tmp) {
387 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
388 Log (undef, "Clean work dirs");
390 my $cleanpid = fork();
393 # Find FUSE mounts that look like Keep mounts (the mount path has the
394 # word "keep") and unmount them. Then clean up work directories.
395 # TODO: When #5036 is done and widely deployed, we can get rid of the
396 # regular expression and just unmount everything with type fuse.keep.
397 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398 ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
403 last if $cleanpid == waitpid (-1, WNOHANG);
404 freeze_if_want_freeze ($cleanpid);
405 select (undef, undef, undef, 0.1);
407 Log (undef, "Cleanup command exited ".exit_status_s($?));
410 # If this job requires a Docker image, install that.
411 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
412 if ($docker_locator = $Job->{docker_image_locator}) {
413 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
416 croak("No Docker image hash found from locator $docker_locator");
418 $docker_stream =~ s/^\.//;
419 my $docker_install_script = qq{
420 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
421 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
424 my $docker_pid = fork();
425 if ($docker_pid == 0)
427 srun (["srun", "--nodelist=" . join(',', @node)],
428 ["/bin/sh", "-ec", $docker_install_script]);
433 last if $docker_pid == waitpid (-1, WNOHANG);
434 freeze_if_want_freeze ($docker_pid);
435 select (undef, undef, undef, 0.1);
439 croak("Installing Docker image from $docker_locator exited "
443 # Determine whether this version of Docker supports memory+swap limits.
444 srun(["srun", "--nodelist=" . $node[0]],
445 ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
447 $docker_limitmem = ($? == 0);
449 if ($Job->{arvados_sdk_version}) {
450 # The job also specifies an Arvados SDK version. Add the SDKs to the
451 # tar file for the build script to install.
452 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
453 $Job->{arvados_sdk_version}));
454 add_git_archive("git", "--git-dir=$git_dir", "archive",
455 "--prefix=.arvados.sdk/",
456 $Job->{arvados_sdk_version}, "sdk");
460 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
461 # If script_version looks like an absolute path, *and* the --git-dir
462 # argument was not given -- which implies we were not invoked by
463 # crunch-dispatch -- we will use the given path as a working
464 # directory instead of resolving script_version to a git commit (or
465 # doing anything else with git).
466 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
467 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
470 # Resolve the given script_version to a git commit sha1. Also, if
471 # the repository is remote, clone it into our local filesystem: this
472 # ensures "git archive" will work, and is necessary to reliably
473 # resolve a symbolic script_version like "master^".
474 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
476 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
478 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
480 # If we're running under crunch-dispatch, it will have already
481 # pulled the appropriate source tree into its own repository, and
482 # given us that repo's path as $git_dir.
484 # If we're running a "local" job, we might have to fetch content
485 # from a remote repository.
487 # (Currently crunch-dispatch gives a local path with --git-dir, but
488 # we might as well accept URLs there too in case it changes its
490 my $repo = $git_dir || $Job->{'repository'};
492 # Repository can be remote or local. If remote, we'll need to fetch it
493 # to a local dir before doing `git log` et al.
496 if ($repo =~ m{://|^[^/]*:}) {
497 # $repo is a git url we can clone, like git:// or https:// or
498 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
499 # not recognized here because distinguishing that from a local
500 # path is too fragile. If you really need something strange here,
501 # use the ssh:// form.
502 $repo_location = 'remote';
503 } elsif ($repo =~ m{^\.*/}) {
504 # $repo is a local path to a git index. We'll also resolve ../foo
505 # to ../foo/.git if the latter is a directory. To help
506 # disambiguate local paths from named hosted repositories, this
507 # form must be given as ./ or ../ if it's a relative path.
508 if (-d "$repo/.git") {
509 $repo = "$repo/.git";
511 $repo_location = 'local';
513 # $repo is none of the above. It must be the name of a hosted
515 my $arv_repo_list = api_call("repositories/list",
516 'filters' => [['name','=',$repo]]);
517 my @repos_found = @{$arv_repo_list->{'items'}};
518 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
520 Log(undef, "Repository '$repo' -> "
521 . join(", ", map { $_->{'uuid'} } @repos_found));
524 croak("Error: Found $n_found repositories with name '$repo'.");
526 $repo = $repos_found[0]->{'fetch_url'};
527 $repo_location = 'remote';
529 Log(undef, "Using $repo_location repository '$repo'");
530 $ENV{"CRUNCH_SRC_URL"} = $repo;
532 # Resolve given script_version (we'll call that $treeish here) to a
533 # commit sha1 ($commit).
534 my $treeish = $Job->{'script_version'};
536 if ($repo_location eq 'remote') {
537 # We minimize excess object-fetching by re-using the same bare
538 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
539 # just keep adding remotes to it as needed.
540 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
541 my $gitcmd = "git --git-dir=\Q$local_repo\E";
543 # Set up our local repo for caching remote objects, making
545 if (!-d $local_repo) {
546 make_path($local_repo) or croak("Error: could not create $local_repo");
548 # This works (exits 0 and doesn't delete fetched objects) even
549 # if $local_repo is already initialized:
550 `$gitcmd init --bare`;
552 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
555 # If $treeish looks like a hash (or abbrev hash) we look it up in
556 # our local cache first, since that's cheaper. (We don't want to
557 # do that with tags/branches though -- those change over time, so
558 # they should always be resolved by the remote repo.)
559 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
560 # Hide stderr because it's normal for this to fail:
561 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
563 # Careful not to resolve a branch named abcdeff to commit 1234567:
564 $sha1 =~ /^$treeish/ &&
565 $sha1 =~ /^([0-9a-f]{40})$/s) {
567 Log(undef, "Commit $commit already present in $local_repo");
571 if (!defined $commit) {
572 # If $treeish isn't just a hash or abbrev hash, or isn't here
573 # yet, we need to fetch the remote to resolve it correctly.
575 # First, remove all local heads. This prevents a name that does
576 # not exist on the remote from resolving to (or colliding with)
577 # a previously fetched branch or tag (possibly from a different
579 remove_tree("$local_repo/refs/heads", {keep_root => 1});
581 Log(undef, "Fetching objects from $repo to $local_repo");
582 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
584 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
588 # Now that the data is all here, we will use our local repo for
589 # the rest of our git activities.
593 my $gitcmd = "git --git-dir=\Q$repo\E";
594 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
595 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
596 croak("`$gitcmd rev-list` exited "
598 .", '$treeish' not found. Giving up.");
601 Log(undef, "Version $treeish is commit $commit");
603 if ($commit ne $Job->{'script_version'}) {
604 # Record the real commit id in the database, frozentokey, logs,
605 # etc. -- instead of an abbreviation or a branch name which can
606 # become ambiguous or point to a different commit in the future.
607 if (!$Job->update_attributes('script_version' => $commit)) {
608 croak("Error: failed to update job's script_version attribute");
612 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
613 add_git_archive("$gitcmd archive ''\Q$commit\E");
616 my $git_archive = combined_git_archive();
617 if (!defined $git_archive) {
618 Log(undef, "Skip install phase (no git archive)");
620 Log(undef, "Warning: This probably means workers have no source tree!");
625 my $install_script_tries_left = 3;
626 for (my $attempts = 0; $attempts < 3; $attempts++) {
627 Log(undef, "Run install script on all workers");
629 my @srunargs = ("srun",
630 "--nodelist=$nodelist",
631 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
632 my @execargs = ("sh", "-c",
633 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
635 $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
636 my ($install_stderr_r, $install_stderr_w);
637 pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
638 set_nonblocking($install_stderr_r);
639 my $installpid = fork();
640 if ($installpid == 0)
642 close($install_stderr_r);
643 fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
644 open(STDOUT, ">&", $install_stderr_w);
645 open(STDERR, ">&", $install_stderr_w);
646 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
649 close($install_stderr_w);
650 # Tell freeze_if_want_freeze how to kill the child, otherwise the
651 # "waitpid(installpid)" loop won't get interrupted by a freeze:
652 $proc{$installpid} = {};
654 # Track whether anything appears on stderr other than slurm errors
655 # ("srun: ...") and the "starting: ..." message printed by the
656 # srun subroutine itself:
657 my $stderr_anything_from_script = 0;
658 my $match_our_own_errors = '^(srun: error: |starting: \[)';
659 while ($installpid != waitpid(-1, WNOHANG)) {
660 freeze_if_want_freeze ($installpid);
661 # Wait up to 0.1 seconds for something to appear on stderr, then
662 # do a non-blocking read.
663 my $bits = fhbits($install_stderr_r);
664 select ($bits, undef, $bits, 0.1);
665 if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
667 while ($stderr_buf =~ /^(.*?)\n/) {
669 substr $stderr_buf, 0, 1+length($line), "";
670 Log(undef, "stderr $line");
671 if ($line !~ /$match_our_own_errors/) {
672 $stderr_anything_from_script = 1;
677 delete $proc{$installpid};
678 $install_exited = $?;
679 close($install_stderr_r);
680 if (length($stderr_buf) > 0) {
681 if ($stderr_buf !~ /$match_our_own_errors/) {
682 $stderr_anything_from_script = 1;
684 Log(undef, "stderr $stderr_buf")
687 Log (undef, "Install script exited ".exit_status_s($install_exited));
688 last if $install_exited == 0 || $main::please_freeze;
689 # If the install script fails but doesn't print an error message,
690 # the next thing anyone is likely to do is just run it again in
691 # case it was a transient problem like "slurm communication fails
692 # because the network isn't reliable enough". So we'll just do
693 # that ourselves (up to 3 attempts in total). OTOH, if there is an
694 # error message, the problem is more likely to have a real fix and
695 # we should fail the job so the fixing process can start, instead
696 # of doing 2 more attempts.
697 last if $stderr_anything_from_script;
700 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
701 unlink($tar_filename);
704 if ($install_exited != 0) {
709 foreach (qw (script script_version script_parameters runtime_constraints))
713 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
715 foreach (split (/\n/, $Job->{knobs}))
717 Log (undef, "knob " . $_);
722 $main::success = undef;
728 my $thisround_succeeded = 0;
729 my $thisround_failed = 0;
730 my $thisround_failed_multiple = 0;
731 my $working_slot_count = scalar(@slot);
733 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
734 or $a <=> $b } @jobstep_todo;
735 my $level = $jobstep[$jobstep_todo[0]]->{level};
737 my $initial_tasks_this_level = 0;
738 foreach my $id (@jobstep_todo) {
739 $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
742 # If the number of tasks scheduled at this level #T is smaller than the number
743 # of slots available #S, only use the first #T slots, or the first slot on
744 # each node, whichever number is greater.
746 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
747 # based on these numbers. Using fewer slots makes more resources available
748 # to each individual task, which should normally be a better strategy when
749 # there are fewer of them running with less parallelism.
751 # Note that this calculation is not redone if the initial tasks at
752 # this level queue more tasks at the same level. This may harm
753 # overall task throughput for that level.
755 if ($initial_tasks_this_level < @node) {
756 @freeslot = (0..$#node);
757 } elsif ($initial_tasks_this_level < @slot) {
758 @freeslot = (0..$initial_tasks_this_level - 1);
760 @freeslot = (0..$#slot);
762 my $round_num_freeslots = scalar(@freeslot);
764 my %round_max_slots = ();
765 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
766 my $this_slot = $slot[$freeslot[$ii]];
767 my $node_name = $this_slot->{node}->{name};
768 $round_max_slots{$node_name} ||= $this_slot->{cpu};
769 last if (scalar(keys(%round_max_slots)) >= @node);
772 Log(undef, "start level $level with $round_num_freeslots slots");
775 my $progress_is_dirty = 1;
776 my $progress_stats_updated = 0;
778 update_progress_stats();
782 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
784 my $id = $jobstep_todo[$todo_ptr];
785 my $Jobstep = $jobstep[$id];
786 if ($Jobstep->{level} != $level)
791 pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
792 set_nonblocking($reader{$id});
794 my $childslot = $freeslot[0];
795 my $childnode = $slot[$childslot]->{node};
796 my $childslotname = join (".",
797 $slot[$childslot]->{node}->{name},
798 $slot[$childslot]->{cpu});
800 my $childpid = fork();
803 $SIG{'INT'} = 'DEFAULT';
804 $SIG{'QUIT'} = 'DEFAULT';
805 $SIG{'TERM'} = 'DEFAULT';
807 foreach (values (%reader))
811 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
812 open(STDOUT,">&writer");
813 open(STDERR,">&writer");
818 delete $ENV{"GNUPGHOME"};
819 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
820 $ENV{"TASK_QSEQUENCE"} = $id;
821 $ENV{"TASK_SEQUENCE"} = $level;
822 $ENV{"JOB_SCRIPT"} = $Job->{script};
823 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
824 $param =~ tr/a-z/A-Z/;
825 $ENV{"JOB_PARAMETER_$param"} = $value;
827 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
828 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
829 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
830 $ENV{"HOME"} = $ENV{"TASK_WORK"};
831 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
832 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
833 $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
834 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
840 "--nodelist=".$childnode->{name},
841 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
842 "--job-name=$job_id.$id.$$",
845 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
846 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
847 ."&& cd $ENV{CRUNCH_TMP} "
848 # These environment variables get used explicitly later in
849 # $command. No tool is expected to read these values directly.
850 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
851 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
852 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
853 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
854 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
857 my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
858 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
859 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
860 # We only set memory limits if Docker lets us limit both memory and swap.
861 # Memory limits alone have been supported longer, but subprocesses tend
862 # to get SIGKILL if they exceed that without any swap limit set.
863 # See #5642 for additional background.
864 if ($docker_limitmem) {
865 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
868 # Dynamically configure the container to use the host system as its
869 # DNS server. Get the host's global addresses from the ip command,
870 # and turn them into docker --dns options using gawk.
872 q{$(ip -o address show scope global |
873 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
875 # The source tree and $destdir directory (which we have
876 # installed on the worker host) are available in the container,
877 # under the same path.
878 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
879 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
881 # Currently, we make arv-mount's mount point appear at /keep
882 # inside the container (instead of using the same path as the
883 # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
884 # crunch scripts and utilities must not rely on this. They must
885 # use $TASK_KEEPMOUNT.
886 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
887 $ENV{TASK_KEEPMOUNT} = "/keep";
889 # TASK_WORK is almost exactly like a docker data volume: it
890 # starts out empty, is writable, and persists until no
891 # containers use it any more. We don't use --volumes-from to
892 # share it with other containers: it is only accessible to this
893 # task, and it goes away when this task stops.
895 # However, a docker data volume is writable only by root unless
896 # the mount point already happens to exist in the container with
897 # different permissions. Therefore, we [1] assume /tmp already
898 # exists in the image and is writable by the crunch user; [2]
899 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
900 # writable if they are created by docker while setting up the
901 # other --volumes); and [3] create $TASK_WORK inside the
902 # container using $build_script.
903 $command .= "--volume=/tmp ";
904 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
905 $ENV{"HOME"} = $ENV{"TASK_WORK"};
906 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
908 # TODO: Share a single JOB_WORK volume across all task
909 # containers on a given worker node, and delete it when the job
910 # ends (and, in case that doesn't work, when the next job
913 # For now, use the same approach as TASK_WORK above.
914 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
916 while (my ($env_key, $env_val) = each %ENV)
918 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
919 $command .= "--env=\Q$env_key=$env_val\E ";
922 $command .= "--env=\QHOME=$ENV{HOME}\E ";
923 $command .= "\Q$docker_hash\E ";
924 $command .= "stdbuf --output=0 --error=0 ";
925 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
928 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
929 $command .= "stdbuf --output=0 --error=0 ";
930 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
933 my @execargs = ('bash', '-c', $command);
934 srun (\@srunargs, \@execargs, undef, $build_script);
935 # exec() failed, we assume nothing happened.
936 die "srun() failed on build script\n";
939 if (!defined $childpid)
946 $proc{$childpid} = { jobstep => $id,
949 jobstepname => "$job_id.$id.$childpid",
951 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
952 $slot[$childslot]->{pid} = $childpid;
954 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
955 Log ($id, "child $childpid started on $childslotname");
956 $Jobstep->{starttime} = time;
957 $Jobstep->{node} = $childnode->{name};
958 $Jobstep->{slotindex} = $childslot;
959 delete $Jobstep->{stderr};
960 delete $Jobstep->{finishtime};
961 delete $Jobstep->{tempfail};
963 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
964 $Jobstep->{'arvados_task'}->save;
966 splice @jobstep_todo, $todo_ptr, 1;
969 $progress_is_dirty = 1;
973 ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
975 last THISROUND if $main::please_freeze || defined($main::success);
976 if ($main::please_info)
978 $main::please_info = 0;
980 create_output_collection();
982 update_progress_stats();
989 check_refresh_wanted();
991 update_progress_stats();
992 select (undef, undef, undef, 0.1);
994 elsif (time - $progress_stats_updated >= 30)
996 update_progress_stats();
998 $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
999 $_->{node}->{hold_count} < 4 } @slot);
1000 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1001 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1003 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1004 .($thisround_failed+$thisround_succeeded)
1005 .") -- giving up on this round";
1006 Log (undef, $message);
1010 # move slots from freeslot to holdslot (or back to freeslot) if necessary
1011 for (my $i=$#freeslot; $i>=0; $i--) {
1012 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1013 push @holdslot, (splice @freeslot, $i, 1);
1016 for (my $i=$#holdslot; $i>=0; $i--) {
1017 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1018 push @freeslot, (splice @holdslot, $i, 1);
1022 # give up if no nodes are succeeding
1023 if ($working_slot_count < 1) {
1024 Log(undef, "Every node has failed -- giving up");
1031 push @freeslot, splice @holdslot;
1032 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1035 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1038 if ($main::please_continue) {
1039 $main::please_continue = 0;
1042 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1044 if (!reapchildren())
1046 check_refresh_wanted();
1048 update_progress_stats();
1049 select (undef, undef, undef, 0.1);
1050 killem (keys %proc) if $main::please_freeze;
1054 update_progress_stats();
1055 freeze_if_want_freeze();
1058 if (!defined $main::success)
1060 if (!@jobstep_todo) {
1062 } elsif ($working_slot_count < 1) {
1063 save_output_collection();
1065 exit(EX_RETRY_UNLOCKED);
1066 } elsif ($thisround_succeeded == 0 &&
1067 ($thisround_failed == 0 || $thisround_failed > 4)) {
1068 my $message = "stop because $thisround_failed tasks failed and none succeeded";
1069 Log (undef, $message);
1074 goto ONELEVEL if !defined $main::success;
1077 release_allocation();
1079 my $collated_output = save_output_collection();
1080 Log (undef, "finish");
1085 if ($collated_output && $main::success) {
1086 $final_state = 'Complete';
1088 $final_state = 'Failed';
1090 $Job->update_attributes('state' => $final_state);
1092 exit (($final_state eq 'Complete') ? 0 : 1);
1096 sub update_progress_stats
1098 $progress_stats_updated = time;
1099 return if !$progress_is_dirty;
1100 my ($todo, $done, $running) = (scalar @jobstep_todo,
1101 scalar @jobstep_done,
1102 scalar @slot - scalar @freeslot - scalar @holdslot);
1103 $Job->{'tasks_summary'} ||= {};
1104 $Job->{'tasks_summary'}->{'todo'} = $todo;
1105 $Job->{'tasks_summary'}->{'done'} = $done;
1106 $Job->{'tasks_summary'}->{'running'} = $running;
1107 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1108 Log (undef, "status: $done done, $running running, $todo todo");
1109 $progress_is_dirty = 0;
1116 my $pid = waitpid (-1, WNOHANG);
1117 return 0 if $pid <= 0;
1119 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1121 . $slot[$proc{$pid}->{slot}]->{cpu});
1122 my $jobstepid = $proc{$pid}->{jobstep};
1123 my $elapsed = time - $proc{$pid}->{time};
1124 my $Jobstep = $jobstep[$jobstepid];
1126 my $childstatus = $?;
1127 my $exitvalue = $childstatus >> 8;
1128 my $exitinfo = "exit ".exit_status_s($childstatus);
1129 $Jobstep->{'arvados_task'}->reload;
1130 my $task_success = $Jobstep->{'arvados_task'}->{success};
1132 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
1134 if (!defined $task_success) {
1135 # task did not indicate one way or the other --> fail
1136 $Jobstep->{'arvados_task'}->{success} = 0;
1137 $Jobstep->{'arvados_task'}->save;
1144 $temporary_fail ||= $Jobstep->{tempfail};
1145 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1147 ++$thisround_failed;
1148 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1150 # Check for signs of a failed or misconfigured node
1151 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1152 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1153 # Don't count this against jobstep failure thresholds if this
1154 # node is already suspected faulty and srun exited quickly
1155 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1157 Log ($jobstepid, "blaming failure on suspect node " .
1158 $slot[$proc{$pid}->{slot}]->{node}->{name});
1159 $temporary_fail ||= 1;
1161 ban_node_by_slot($proc{$pid}->{slot});
1164 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
1165 ++$Jobstep->{'failures'},
1166 $temporary_fail ? 'temporary' : 'permanent',
1169 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1170 # Give up on this task, and the whole job
1173 # Put this task back on the todo queue
1174 push @jobstep_todo, $jobstepid;
1175 $Job->{'tasks_summary'}->{'failed'}++;
1179 ++$thisround_succeeded;
1180 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1181 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1182 $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1183 push @jobstep_done, $jobstepid;
1184 Log ($jobstepid, "success in $elapsed seconds");
1186 $Jobstep->{exitcode} = $childstatus;
1187 $Jobstep->{finishtime} = time;
1188 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1189 $Jobstep->{'arvados_task'}->save;
1190 process_stderr ($jobstepid, $task_success);
1191 Log ($jobstepid, sprintf("task output (%d bytes): %s",
1192 length($Jobstep->{'arvados_task'}->{output}),
1193 $Jobstep->{'arvados_task'}->{output}));
1195 close $reader{$jobstepid};
1196 delete $reader{$jobstepid};
1197 delete $slot[$proc{$pid}->{slot}]->{pid};
1198 push @freeslot, $proc{$pid}->{slot};
1201 if ($task_success) {
1203 my $newtask_list = [];
1204 my $newtask_results;
1206 $newtask_results = api_call(
1209 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1211 'order' => 'qsequence',
1212 'offset' => scalar(@$newtask_list),
1214 push(@$newtask_list, @{$newtask_results->{items}});
1215 } while (@{$newtask_results->{items}});
1216 foreach my $arvados_task (@$newtask_list) {
1218 'level' => $arvados_task->{'sequence'},
1220 'arvados_task' => $arvados_task
1222 push @jobstep, $jobstep;
1223 push @jobstep_todo, $#jobstep;
1227 $progress_is_dirty = 1;
1231 sub check_refresh_wanted
1233 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1234 if (@stat && $stat[9] > $latest_refresh) {
1235 $latest_refresh = scalar time;
1236 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1237 for my $attr ('cancelled_at',
1238 'cancelled_by_user_uuid',
1239 'cancelled_by_client_uuid',
1241 $Job->{$attr} = $Job2->{$attr};
1243 if ($Job->{'state'} ne "Running") {
1244 if ($Job->{'state'} eq "Cancelled") {
1245 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1247 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1250 $main::please_freeze = 1;
1257 my $last_squeue_check = $squeue_checked;
1259 # Do not call `squeue` or check the kill list more than once every
1261 return if $last_squeue_check > time - 15;
1262 $squeue_checked = time;
1264 # Look for children from which we haven't received stderr data since
1265 # the last squeue check. If no such children exist, all procs are
1266 # alive and there's no need to even look at squeue.
1268 # As long as the crunchstat poll interval (10s) is shorter than the
1269 # squeue check interval (15s) this should make the squeue check an
1271 my $silent_procs = 0;
1272 for my $jobstep (values %proc)
1274 if ($jobstep->{stderr_at} < $last_squeue_check)
1279 return if $silent_procs == 0;
1281 # use killem() on procs whose killtime is reached
1282 while (my ($pid, $jobstep) = each %proc)
1284 if (exists $jobstep->{killtime}
1285 && $jobstep->{killtime} <= time
1286 && $jobstep->{stderr_at} < $last_squeue_check)
1289 if ($jobstep->{stderr_at}) {
1290 $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
1292 Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1299 # here is an opportunity to check for mysterious problems with local procs
1303 # Get a list of steps still running. Note: squeue(1) says --steps
1304 # selects a format (which we override anyway) and allows us to
1305 # specify which steps we're interested in (which we don't).
1306 # Importantly, it also changes the meaning of %j from "job name" to
1307 # "step name" and (although this isn't mentioned explicitly in the
1308 # docs) switches from "one line per job" mode to "one line per step"
1309 # mode. Without it, we'd just get a list of one job, instead of a
1311 my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1314 Log(undef, "warning: squeue exit status $? ($!)");
1319 # which of my jobsteps are running, according to squeue?
1321 for my $jobstepname (@squeue)
1323 $ok{$jobstepname} = 1;
1326 # Check for child procs >60s old and not mentioned by squeue.
1327 while (my ($pid, $jobstep) = each %proc)
1329 if ($jobstep->{time} < time - 60
1330 && $jobstep->{jobstepname}
1331 && !exists $ok{$jobstep->{jobstepname}}
1332 && !exists $jobstep->{killtime})
1334 # According to slurm, this task has ended (successfully or not)
1335 # -- but our srun child hasn't exited. First we must wait (30
1336 # seconds) in case this is just a race between communication
1337 # channels. Then, if our srun child process still hasn't
1338 # terminated, we'll conclude some slurm communication
1339 # error/delay has caused the task to die without notifying srun,
1340 # and we'll kill srun ourselves.
1341 $jobstep->{killtime} = time + 30;
1342 Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
1348 sub release_allocation
1352 Log (undef, "release job allocation");
1353 system "scancel $ENV{SLURM_JOB_ID}";
1361 foreach my $job (keys %reader)
1364 while (0 < sysread ($reader{$job}, $buf, 8192))
1366 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1367 $jobstep[$job]->{stderr_at} = time;
1368 $jobstep[$job]->{stderr} .= $buf;
1369 preprocess_stderr ($job);
1370 if (length ($jobstep[$job]->{stderr}) > 16384)
1372 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1381 sub preprocess_stderr
1385 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1387 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1388 Log ($job, "stderr $line");
1389 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1391 $main::please_freeze = 1;
1393 elsif ($line =~ /srun: error: Node failure on/) {
1394 my $job_slot_index = $jobstep[$job]->{slotindex};
1395 $slot[$job_slot_index]->{node}->{fail_count}++;
1396 $jobstep[$job]->{tempfail} = 1;
1397 ban_node_by_slot($job_slot_index);
1399 elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
1400 $jobstep[$job]->{tempfail} = 1;
1401 ban_node_by_slot($jobstep[$job]->{slotindex});
1403 elsif ($line =~ /arvados\.errors\.Keep/) {
1404 $jobstep[$job]->{tempfail} = 1;
1413 my $task_success = shift;
1414 preprocess_stderr ($job);
1417 Log ($job, "stderr $_");
1418 } split ("\n", $jobstep[$job]->{stderr});
1425 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1426 Log(undef, "fetch_block run error from arv-get $hash: $!");
1429 my $output_block = "";
1432 my $bytes = sysread($keep, $buf, 1024 * 1024);
1433 if (!defined $bytes) {
1434 Log(undef, "fetch_block read error from arv-get: $!");
1435 $output_block = undef;
1437 } elsif ($bytes == 0) {
1438 # sysread returns 0 at the end of the pipe.
1441 # some bytes were read into buf.
1442 $output_block .= $buf;
1447 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1448 $output_block = undef;
1450 return $output_block;
1453 # Create a collection by concatenating the output of all tasks (each
1454 # task's output is either a manifest fragment, a locator for a
1455 # manifest fragment stored in Keep, or nothing at all). Return the
1456 # portable_data_hash of the new collection.
1457 sub create_output_collection
1459 Log (undef, "collate");
1461 my ($child_out, $child_in);
1462 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1465 print (arvados.api("v1").collections().
1466 create(body={"manifest_text": sys.stdin.read()}).
1467 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1471 my $manifest_size = 0;
1475 my $output = $_->{'arvados_task'}->{output};
1476 next if (!defined($output));
1478 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1479 $next_write = fetch_block($output);
1481 $next_write = $output;
1483 if (defined($next_write)) {
1484 if (!defined(syswrite($child_in, $next_write))) {
1485 # There's been an error writing. Stop the loop.
1486 # We'll log details about the exit code later.
1489 $manifest_size += length($next_write);
1492 my $uuid = $_->{'arvados_task'}->{'uuid'};
1493 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1498 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1501 my $s = IO::Select->new($child_out);
1502 if ($s->can_read(120)) {
1503 sysread($child_out, $joboutput, 1024 * 1024);
1506 Log(undef, "output collection creation exited " . exit_status_s($?));
1512 Log (undef, "timed out while creating output collection");
1513 foreach my $signal (2, 2, 2, 15, 15, 9) {
1514 kill($signal, $pid);
1515 last if waitpid($pid, WNOHANG) == -1;
1524 # Calls create_output_collection, logs the result, and returns it.
1525 # If that was successful, save that as the output in the job record.
1526 sub save_output_collection {
1527 my $collated_output = create_output_collection();
1529 if (!$collated_output) {
1530 Log(undef, "Failed to write output collection");
1533 Log(undef, "job output $collated_output");
1534 $Job->update_attributes('output' => $collated_output);
1536 return $collated_output;
1543 my $sig = 2; # SIGINT first
1544 if (exists $proc{$_}->{"sent_$sig"} &&
1545 time - $proc{$_}->{"sent_$sig"} > 4)
1547 $sig = 15; # SIGTERM if SIGINT doesn't work
1549 if (exists $proc{$_}->{"sent_$sig"} &&
1550 time - $proc{$_}->{"sent_$sig"} > 4)
1552 $sig = 9; # SIGKILL if SIGTERM doesn't work
1554 if (!exists $proc{$_}->{"sent_$sig"})
1556 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1558 select (undef, undef, undef, 0.1);
1561 kill $sig, $_; # srun wants two SIGINT to really interrupt
1563 $proc{$_}->{"sent_$sig"} = time;
1564 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1574 vec($bits,fileno($_),1) = 1;
1580 # Send log output to Keep via arv-put.
1582 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1583 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1584 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1585 # $log_pipe_pid is the pid of the arv-put subprocess.
1587 # The only functions that should access these variables directly are:
1589 # log_writer_start($logfilename)
1590 # Starts an arv-put pipe, reading data on stdin and writing it to
1591 # a $logfilename file in an output collection.
1593 # log_writer_read_output([$timeout])
1594 # Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1595 # Passes $timeout to the select() call, with a default of 0.01.
1596 # Returns the result of the last read() call on $log_pipe_out, or
1597 # -1 if read() wasn't called because select() timed out.
1598 # Only other log_writer_* functions should need to call this.
1600 # log_writer_send($txt)
1601 # Writes $txt to the output log collection.
1603 # log_writer_finish()
1604 # Closes the arv-put pipe and returns the output that it produces.
1606 # log_writer_is_active()
1607 # Returns a true value if there is currently a live arv-put
1608 # process, false otherwise.
1610 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1613 sub log_writer_start($)
1615 my $logfilename = shift;
1616 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1620 '--filename', $logfilename,
1622 $log_pipe_out_buf = "";
1623 $log_pipe_out_select = IO::Select->new($log_pipe_out);
1626 sub log_writer_read_output {
1627 my $timeout = shift || 0.01;
1629 while ($read && $log_pipe_out_select->can_read($timeout)) {
1630 $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1631 length($log_pipe_out_buf));
1633 if (!defined($read)) {
1634 Log(undef, "error reading log manifest from arv-put: $!");
1639 sub log_writer_send($)
1642 print $log_pipe_in $txt;
1643 log_writer_read_output();
1646 sub log_writer_finish()
1648 return unless $log_pipe_pid;
1650 close($log_pipe_in);
1652 my $read_result = log_writer_read_output(120);
1653 if ($read_result == -1) {
1654 Log (undef, "timed out reading from 'arv-put'");
1655 } elsif ($read_result != 0) {
1656 Log(undef, "failed to read arv-put log manifest to EOF");
1659 waitpid($log_pipe_pid, 0);
1661 Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1664 close($log_pipe_out);
1665 my $arv_put_output = $log_pipe_out_buf;
1666 $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1667 $log_pipe_out_select = undef;
1669 return $arv_put_output;
1672 sub log_writer_is_active() {
1673 return $log_pipe_pid;
1676 sub Log # ($jobstep_id, $logmessage)
1678 if ($_[1] =~ /\n/) {
1679 for my $line (split (/\n/, $_[1])) {
1684 my $fh = select STDERR; $|=1; select $fh;
1685 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1686 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1689 if (log_writer_is_active() || -t STDERR) {
1690 my @gmtime = gmtime;
1691 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1692 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1694 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1696 if (log_writer_is_active()) {
1697 log_writer_send($datetime . " " . $message);
1704 my ($package, $file, $line) = caller;
1705 my $message = "@_ at $file line $line\n";
1706 Log (undef, $message);
1707 freeze() if @jobstep_todo;
1708 create_output_collection() if @jobstep_todo;
1718 if ($Job->{'state'} eq 'Cancelled') {
1719 $Job->update_attributes('finished_at' => scalar gmtime);
1721 $Job->update_attributes('state' => 'Failed');
1728 my $justcheckpoint = shift; # false if this will be the last meta saved
1729 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1730 return unless log_writer_is_active();
1732 my $log_manifest = "";
1734 my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1735 $log_manifest .= $prev_log_coll->{manifest_text};
1737 $log_manifest .= log_writer_finish();
1739 my $log_coll = api_call(
1740 "collections/create", ensure_unique_name => 1, collection => {
1741 manifest_text => $log_manifest,
1742 owner_uuid => $Job->{owner_uuid},
1743 name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1745 Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1746 $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1750 sub freeze_if_want_freeze
1752 if ($main::please_freeze)
1754 release_allocation();
1757 # kill some srun procs before freeze+stop
1758 map { $proc{$_} = {} } @_;
1761 killem (keys %proc);
1762 select (undef, undef, undef, 0.1);
1764 while (($died = waitpid (-1, WNOHANG)) > 0)
1766 delete $proc{$died};
1771 create_output_collection();
1781 Log (undef, "Freeze not implemented");
1788 croak ("Thaw not implemented");
1804 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1811 my $srunargs = shift;
1812 my $execargs = shift;
1813 my $opts = shift || {};
1815 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1817 $Data::Dumper::Terse = 1;
1818 $Data::Dumper::Indent = 0;
1819 my $show_cmd = Dumper($args);
1820 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
1821 $show_cmd =~ s/\n/ /g;
1822 if ($opts->{fork}) {
1823 Log(undef, "starting: $show_cmd");
1825 # This is a child process: parent is in charge of reading our
1826 # stderr and copying it to Log() if needed.
1827 warn "starting: $show_cmd\n";
1830 if (defined $stdin) {
1831 my $child = open STDIN, "-|";
1832 defined $child or die "no fork: $!";
1834 print $stdin or die $!;
1835 close STDOUT or die $!;
1840 return system (@$args) if $opts->{fork};
1843 warn "ENV size is ".length(join(" ",%ENV));
1844 die "exec failed: $!: @$args";
1848 sub ban_node_by_slot {
1849 # Don't start any new jobsteps on this node for 60 seconds
1851 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1852 $slot[$slotid]->{node}->{hold_count}++;
1853 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1858 my ($lockfile, $error_message) = @_;
1859 open L, ">", $lockfile or croak("$lockfile: $!");
1860 if (!flock L, LOCK_EX|LOCK_NB) {
1861 croak("Can't lock $lockfile: $error_message\n");
1865 sub find_docker_image {
1866 # Given a Keep locator, check to see if it contains a Docker image.
1867 # If so, return its stream name and Docker hash.
1868 # If not, return undef for both values.
1869 my $locator = shift;
1870 my ($streamname, $filename);
1871 my $image = api_call("collections/get", uuid => $locator);
1873 foreach my $line (split(/\n/, $image->{manifest_text})) {
1874 my @tokens = split(/\s+/, $line);
1876 $streamname = shift(@tokens);
1877 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1878 if (defined($filename)) {
1879 return (undef, undef); # More than one file in the Collection.
1881 $filename = (split(/:/, $filedata, 3))[2];
1886 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1887 return ($streamname, $1);
1889 return (undef, undef);
1894 # Calculate the number of times an operation should be retried,
1895 # assuming exponential backoff, and that we're willing to retry as
1896 # long as tasks have been running. Enforce a minimum of 3 retries.
1897 my ($starttime, $endtime, $timediff, $retries);
1899 $starttime = $jobstep[0]->{starttime};
1900 $endtime = $jobstep[-1]->{finishtime};
1902 if (!defined($starttime)) {
1904 } elsif (!defined($endtime)) {
1905 $timediff = time - $starttime;
1907 $timediff = ($endtime - $starttime) - (time - $endtime);
1909 if ($timediff > 0) {
1910 $retries = int(log($timediff) / log(2));
1912 $retries = 1; # Use the minimum.
1914 return ($retries > 3) ? $retries : 3;
1918 # Pass in two function references.
1919 # This method will be called with the remaining arguments.
1920 # If it dies, retry it with exponential backoff until it succeeds,
1921 # or until the current retry_count is exhausted. After each failure
1922 # that can be retried, the second function will be called with
1923 # the current try count (0-based), next try time, and error message.
1924 my $operation = shift;
1925 my $retry_callback = shift;
1926 my $retries = retry_count();
1927 foreach my $try_count (0..$retries) {
1928 my $next_try = time + (2 ** $try_count);
1929 my $result = eval { $operation->(@_); };
1932 } elsif ($try_count < $retries) {
1933 $retry_callback->($try_count, $next_try, $@);
1934 my $sleep_time = $next_try - time;
1935 sleep($sleep_time) if ($sleep_time > 0);
1938 # Ensure the error message ends in a newline, so Perl doesn't add
1939 # retry_op's line number to it.
1945 # Pass in a /-separated API method name, and arguments for it.
1946 # This function will call that method, retrying as needed until
1947 # the current retry_count is exhausted, with a log on the first failure.
1948 my $method_name = shift;
1949 my $log_api_retry = sub {
1950 my ($try_count, $next_try_at, $errmsg) = @_;
1951 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
1952 $errmsg =~ s/\s/ /g;
1953 $errmsg =~ s/\s+$//;
1955 if ($next_try_at < time) {
1956 $retry_msg = "Retrying.";
1958 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
1959 $retry_msg = "Retrying at $next_try_fmt.";
1961 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
1964 foreach my $key (split(/\//, $method_name)) {
1965 $method = $method->{$key};
1967 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
1971 # Given a $?, return a human-readable exit code string like "0" or
1972 # "1" or "0 with signal 1" or "1 with signal 11".
1973 my $exitcode = shift;
1974 my $s = $exitcode >> 8;
1975 if ($exitcode & 0x7f) {
1976 $s .= " with signal " . ($exitcode & 0x7f);
1978 if ($exitcode & 0x80) {
1979 $s .= " with core dump";
1984 sub handle_readall {
1985 # Pass in a glob reference to a file handle.
1986 # Read all its contents and return them as a string.
1987 my $fh_glob_ref = shift;
1989 return <$fh_glob_ref>;
1992 sub tar_filename_n {
1994 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
1997 sub add_git_archive {
1998 # Pass in a git archive command as a string or list, a la system().
1999 # This method will save its output to be included in the archive sent to the
2003 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2004 croak("Failed to save git archive: $!");
2006 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2008 waitpid($git_pid, 0);
2011 croak("Failed to save git archive: git exited " . exit_status_s($?));
2015 sub combined_git_archive {
2016 # Combine all saved tar archives into a single archive, then return its
2017 # contents in a string. Return undef if no archives have been saved.
2018 if ($git_tar_count < 1) {
2021 my $base_tar_name = tar_filename_n(1);
2022 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2023 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2024 if ($tar_exit != 0) {
2025 croak("Error preparing build archive: tar -A exited " .
2026 exit_status_s($tar_exit));
2029 if (!open(GIT_TAR, "<", $base_tar_name)) {
2030 croak("Could not open build archive: $!");
2032 my $tar_contents = handle_readall(\*GIT_TAR);
2034 return $tar_contents;
2037 sub set_nonblocking {
2039 my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2040 fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2046 # This is crunch-job's internal dispatch script. crunch-job running on the API
2047 # server invokes this script on individual compute nodes, or localhost if we're
2048 # running a job locally. It gets called in two modes:
2050 # * No arguments: Installation mode. Read a tar archive from the DATA
2051 # file handle; it includes the Crunch script's source code, and
2052 # maybe SDKs as well. Those should be installed in the proper
2053 # locations. This runs outside of any Docker container, so don't try to
2054 # introspect Crunch's runtime environment.
2056 # * With arguments: Crunch script run mode. This script should set up the
2057 # environment, then run the command specified in the arguments. This runs
2058 # inside any Docker container.
2061 use File::Path qw( make_path remove_tree );
2062 use POSIX qw(getcwd);
2064 use constant TASK_TEMPFAIL => 111;
2066 # Map SDK subdirectories to the path environments they belong to.
2067 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2069 my $destdir = $ENV{"CRUNCH_SRC"};
2070 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2071 my $repo = $ENV{"CRUNCH_SRC_URL"};
2072 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2073 my $job_work = $ENV{"JOB_WORK"};
2074 my $task_work = $ENV{"TASK_WORK"};
2076 open(STDOUT_ORIG, ">&", STDOUT);
2077 open(STDERR_ORIG, ">&", STDERR);
2079 for my $dir ($destdir, $job_work, $task_work) {
2082 -e $dir or die "Failed to create temporary directory ($dir): $!";
2087 remove_tree($task_work, {keep_root => 1});
2090 ### Crunch script run mode
2092 # We want to do routine logging during task 0 only. This gives the user
2093 # the information they need, but avoids repeating the information for every
2096 if ($ENV{TASK_SEQUENCE} eq "0") {
2099 printf STDERR_ORIG "[Crunch] $msg\n", @_;
2105 my $python_src = "$install_dir/python";
2106 my $venv_dir = "$job_work/.arvados.venv";
2107 my $venv_built = -e "$venv_dir/bin/activate";
2108 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2109 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2110 "--python=python2.7", $venv_dir);
2111 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2113 $Log->("Built Python SDK virtualenv");
2116 my $pip_bin = "pip";
2118 $Log->("Running in Python SDK virtualenv");
2119 $pip_bin = "$venv_dir/bin/pip";
2120 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2121 @ARGV = ("/bin/sh", "-ec",
2122 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2123 } elsif (-d $python_src) {
2124 $Log->("Warning: virtualenv not found inside Docker container default " .
2125 "\$PATH. Can't install Python SDK.");
2128 my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
2130 $Log->("Using Arvados SDK:");
2131 foreach my $line (split /\n/, $pkgs) {
2135 $Log->("Arvados SDK packages not found");
2138 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2139 my $sdk_path = "$install_dir/$sdk_dir";
2141 if ($ENV{$sdk_envkey}) {
2142 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2144 $ENV{$sdk_envkey} = $sdk_path;
2146 $Log->("Arvados SDK added to %s", $sdk_envkey);
2151 die "Cannot exec `@ARGV`: $!";
2154 ### Installation mode
2155 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2157 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2158 # This exact git archive (source + arvados sdk) is already installed
2159 # here, so there's no need to reinstall it.
2161 # We must consume our DATA section, though: otherwise the process
2162 # feeding it to us will get SIGPIPE.
2164 while (read(DATA, $buf, 65536)) { }
2169 unlink "$destdir.archive_hash";
2173 # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2174 local $SIG{PIPE} = "IGNORE";
2175 warn "Extracting archive: $archive_hash\n";
2176 # --ignore-zeros is necessary sometimes: depending on how much NUL
2177 # padding tar -A put on our combined archive (which in turn depends
2178 # on the length of the component archives) tar without
2179 # --ignore-zeros will exit before consuming stdin and cause close()
2180 # to fail on the resulting SIGPIPE.
2181 if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2182 die "Error launching 'tar -xC $destdir': $!";
2184 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2185 # get SIGPIPE. We must feed it data incrementally.
2187 while (read(DATA, $tar_input, 65536)) {
2188 print TARX $tar_input;
2191 die "'tar -xC $destdir' exited $?: $!";
2197 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2199 foreach my $sdk_lang (("python",
2200 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2201 if (-d "$sdk_root/$sdk_lang") {
2202 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2203 die "Failed to install $sdk_lang SDK: $!";
2209 my $python_dir = "$install_dir/python";
2210 if ((-d $python_dir) and can_run("python2.7")) {
2211 open(my $egg_info_pipe, "-|",
2212 "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
2213 my @egg_info_errors = <$egg_info_pipe>;
2214 close($egg_info_pipe);
2216 if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
2217 # egg_info apparently failed because it couldn't ask git for a build tag.
2218 # Specify no build tag.
2219 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2220 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2223 my $egg_info_exit = $? >> 8;
2224 foreach my $errline (@egg_info_errors) {
2225 print STDERR_ORIG $errline;
2227 warn "python setup.py egg_info failed: exit $egg_info_exit";
2228 exit ($egg_info_exit || 1);
2233 # Hide messages from the install script (unless it fails: shell_or_die
2234 # will show $destdir.log in that case).
2235 open(STDOUT, ">>", "$destdir.log");
2236 open(STDERR, ">&", STDOUT);
2238 if (-e "$destdir/crunch_scripts/install") {
2239 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2240 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2242 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2243 } elsif (-e "./install.sh") {
2244 shell_or_die (undef, "./install.sh", $install_dir);
2247 if ($archive_hash) {
2248 unlink "$destdir.archive_hash.new";
2249 symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2250 rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2256 my $command_name = shift;
2257 open(my $which, "-|", "which", $command_name);
2258 while (<$which>) { }
2265 my $exitcode = shift;
2267 if ($ENV{"DEBUG"}) {
2268 print STDERR "@_\n";
2270 if (system (@_) != 0) {
2273 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2274 open STDERR, ">&STDERR_ORIG";
2275 system ("cat $destdir.log >&2");
2276 warn "@_ failed ($err): $exitstatus";
2277 if (defined($exitcode)) {
2281 exit (($code >> 8) || 1);