2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
10 crunch-job: Execute job steps, save snapshots as requested, collate output.
14 Obtain job details from Arvados, run tasks on compute nodes (typically
15 invoked by scheduler on controller):
17 crunch-job --job x-y-z --git-dir /path/to/repo/.git
19 Obtain job details from command line, run tasks on local machine
20 (typically invoked by application or developer on VM):
22 crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
24 crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
32 If the job is already locked, steal the lock and run it anyway.
36 Path to a .git directory (or a git URL) where the commit given in the
37 job's C<script_version> attribute is to be found. If this is I<not>
38 given, the job's C<repository> attribute will be used.
42 Arvados API authorization token to use during the course of the job.
46 Do not clear per-job/task temporary directories during initial job
47 setup. This can speed up development and debugging when running jobs
52 UUID of the job to run, or a JSON-encoded job resource without a
53 UUID. If the latter is given, a new job object will be created.
57 =head1 RUNNING JOBS LOCALLY
59 crunch-job's log messages appear on stderr along with the job tasks'
60 stderr streams. The log is saved in Keep at each checkpoint and when
63 If the job succeeds, the job's output locator is printed on stdout.
65 While the job is running, the following signals are accepted:
69 =item control-C, SIGINT, SIGQUIT
71 Save a checkpoint, terminate any job tasks that are running, and stop.
75 Save a checkpoint and continue.
79 Refresh node allocation (i.e., check whether any nodes have been added
80 or unallocated) and attributes of the Job record that should affect
81 behavior (e.g., cancel job if cancelled_at becomes non-nil).
89 use POSIX ':sys_wait_h';
90 use POSIX qw(strftime);
91 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
95 use Digest::MD5 qw(md5_hex);
101 use File::Path qw( make_path remove_tree );
103 use constant TASK_TEMPFAIL => 111;
104 use constant EX_TEMPFAIL => 75;
105 use constant EX_RETRY_UNLOCKED => 93;
107 $ENV{"TMPDIR"} ||= "/tmp";
108 unless (defined $ENV{"CRUNCH_TMP"}) {
109 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
110 if ($ENV{"USER"} ne "crunch" && $< != 0) {
111 # use a tmp dir unique for my uid
112 $ENV{"CRUNCH_TMP"} .= "-$<";
116 # Create the tmp directory if it does not exist
117 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
118 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
121 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
122 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
123 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
124 mkdir ($ENV{"JOB_WORK"});
133 my $cgroup_root = "/sys/fs/cgroup";
134 my $docker_bin = "docker.io";
135 my $docker_run_args = "";
136 GetOptions('force-unlock' => \$force_unlock,
137 'git-dir=s' => \$git_dir,
138 'job=s' => \$jobspec,
139 'job-api-token=s' => \$job_api_token,
140 'no-clear-tmp' => \$no_clear_tmp,
141 'resume-stash=s' => \$resume_stash,
142 'cgroup-root=s' => \$cgroup_root,
143 'docker-bin=s' => \$docker_bin,
144 'docker-run-args=s' => \$docker_run_args,
147 if (defined $job_api_token) {
148 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
151 my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
156 $main::ENV{CRUNCH_DEBUG} = 1;
160 $main::ENV{CRUNCH_DEBUG} = 0;
163 my $arv = Arvados->new('apiVersion' => 'v1');
172 if ($jobspec =~ /^[-a-z\d]+$/)
174 # $jobspec is an Arvados UUID, not a JSON job specification
175 $Job = api_call("jobs/get", uuid => $jobspec);
180 $local_job = JSON::decode_json($jobspec);
184 # Make sure our workers (our slurm nodes, localhost, or whatever) are
185 # at least able to run basic commands: they aren't down or severely
188 if (($Job || $local_job)->{docker_image_locator}) {
189 $cmd = [$docker_bin, 'ps', '-q'];
191 Log(undef, "Sanity check is `@$cmd`");
192 my ($exited, $stdout, $stderr) = srun_sync(
193 ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
195 {label => "sanity check"});
197 Log(undef, "Sanity check failed: ".exit_status_s($exited));
200 Log(undef, "Sanity check OK");
203 my $User = api_call("users/current");
206 if (!$force_unlock) {
207 # Claim this job, and make sure nobody else does
208 eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
210 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
219 map { croak ("No $_ specified") unless $local_job->{$_} }
220 qw(script script_version script_parameters);
223 $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
224 $local_job->{'started_at'} = gmtime;
225 $local_job->{'state'} = 'Running';
227 $Job = api_call("jobs/create", job => $local_job);
229 $job_id = $Job->{'uuid'};
231 my $keep_logfile = $job_id . '.log.txt';
232 log_writer_start($keep_logfile);
234 $Job->{'runtime_constraints'} ||= {};
235 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
236 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
238 my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
240 $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
241 chomp($gem_versions);
242 chop($gem_versions); # Closing parentheses
247 "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
249 Log (undef, "check slurm allocation");
252 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
256 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
257 push @sinfo, "$localcpus localhost";
259 if (exists $ENV{SLURM_NODELIST})
261 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
265 my ($ncpus, $slurm_nodelist) = split;
266 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
269 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
272 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
275 foreach (split (",", $ranges))
288 push @nodelist, map {
290 $n =~ s/\[[-,\d]+\]/$_/;
297 push @nodelist, $nodelist;
300 foreach my $nodename (@nodelist)
302 Log (undef, "node $nodename - $ncpus slots");
303 my $node = { name => $nodename,
305 # The number of consecutive times a task has been dispatched
306 # to this node and failed.
308 # The number of consecutive times that SLURM has reported
309 # a node failure since the last successful task.
311 # Don't dispatch work to this node until this time
312 # (in seconds since the epoch) has passed.
314 foreach my $cpu (1..$ncpus)
316 push @slot, { node => $node,
320 push @node, @nodelist;
325 # Ensure that we get one jobstep running on each allocated node before
326 # we start overloading nodes with concurrent steps
328 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
331 $Job->update_attributes(
332 'tasks_summary' => { 'failed' => 0,
337 Log (undef, "start");
338 $SIG{'INT'} = sub { $main::please_freeze = 1; };
339 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
340 $SIG{'TERM'} = \&croak;
341 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
342 $SIG{'ALRM'} = sub { $main::please_info = 1; };
343 $SIG{'CONT'} = sub { $main::please_continue = 1; };
344 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
346 $main::please_freeze = 0;
347 $main::please_info = 0;
348 $main::please_continue = 0;
349 $main::please_refresh = 0;
350 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
352 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
353 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
354 $ENV{"JOB_UUID"} = $job_id;
357 my @jobstep_todo = ();
358 my @jobstep_done = ();
359 my @jobstep_tomerge = ();
360 my $jobstep_tomerge_level = 0;
361 my $squeue_checked = 0;
362 my $sinfo_checked = 0;
363 my $latest_refresh = scalar time;
367 if (defined $Job->{thawedfromkey})
369 thaw ($Job->{thawedfromkey});
373 my $first_task = api_call("job_tasks/create", job_task => {
374 'job_uuid' => $Job->{'uuid'},
379 push @jobstep, { 'level' => 0,
381 'arvados_task' => $first_task,
383 push @jobstep_todo, 0;
389 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
392 my $build_script = handle_readall(\*DATA);
393 my $nodelist = join(",", @node);
394 my $git_tar_count = 0;
396 if (!defined $no_clear_tmp) {
397 # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
398 # up work directories crunch_tmp/work, crunch_tmp/opt,
400 my ($exited, $stdout, $stderr) = srun_sync(
401 ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
403 arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
404 rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
406 {label => "clean work dirs"});
408 exit(EX_RETRY_UNLOCKED);
412 # If this job requires a Docker image, install that.
413 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
414 if ($docker_locator = $Job->{docker_image_locator}) {
415 Log (undef, "Install docker image $docker_locator");
416 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
419 croak("No Docker image hash found from locator $docker_locator");
421 Log (undef, "docker image hash is $docker_hash");
422 $docker_stream =~ s/^\.//;
423 my $docker_install_script = qq{
425 id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
426 echo "image ID is \$id"
427 [[ \${id} = \Q$docker_hash\E ]]
429 if loaded >&2 2>/dev/null; then
430 echo >&2 "image is already present"
433 echo >&2 "docker image is not present; loading"
434 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
435 if ! loaded >&2; then
436 echo >&2 "`docker load` exited 0, but image is not found (!)"
439 echo >&2 "image loaded successfully"
442 my ($exited, $stdout, $stderr) = srun_sync(
443 ["srun", "--nodelist=" . join(',', @node)],
444 ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
445 {label => "load docker image"});
448 exit(EX_RETRY_UNLOCKED);
451 # Determine whether this version of Docker supports memory+swap limits.
452 ($exited, $stdout, $stderr) = srun_sync(
453 ["srun", "--nodes=1"],
454 [$docker_bin, 'run', '--help'],
455 {label => "check --memory-swap feature"});
456 $docker_limitmem = ($stdout =~ /--memory-swap/);
458 # Find a non-root Docker user to use.
459 # Tries the default user for the container, then 'crunch', then 'nobody',
460 # testing for whether the actual user id is non-zero. This defends against
461 # mistakes but not malice, but we intend to harden the security in the future
462 # so we don't want anyone getting used to their jobs running as root in their
464 my @tryusers = ("", "crunch", "nobody");
465 foreach my $try_user (@tryusers) {
468 if ($try_user eq "") {
469 $label = "check whether default user is UID 0";
472 $label = "check whether user '$try_user' is UID 0";
473 $try_user_arg = "--user=$try_user";
475 my ($exited, $stdout, $stderr) = srun_sync(
476 ["srun", "--nodes=1"],
478 "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
481 if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
482 $dockeruserarg = $try_user_arg;
483 if ($try_user eq "") {
484 Log(undef, "Container will run with default user");
486 Log(undef, "Container will run with $dockeruserarg");
492 if (!defined $dockeruserarg) {
493 croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
496 if ($Job->{arvados_sdk_version}) {
497 # The job also specifies an Arvados SDK version. Add the SDKs to the
498 # tar file for the build script to install.
499 Log(undef, sprintf("Packing Arvados SDK version %s for installation",
500 $Job->{arvados_sdk_version}));
501 add_git_archive("git", "--git-dir=$git_dir", "archive",
502 "--prefix=.arvados.sdk/",
503 $Job->{arvados_sdk_version}, "sdk");
507 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
508 # If script_version looks like an absolute path, *and* the --git-dir
509 # argument was not given -- which implies we were not invoked by
510 # crunch-dispatch -- we will use the given path as a working
511 # directory instead of resolving script_version to a git commit (or
512 # doing anything else with git).
513 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
514 $ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
517 # Resolve the given script_version to a git commit sha1. Also, if
518 # the repository is remote, clone it into our local filesystem: this
519 # ensures "git archive" will work, and is necessary to reliably
520 # resolve a symbolic script_version like "master^".
521 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
523 Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
525 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
527 # If we're running under crunch-dispatch, it will have already
528 # pulled the appropriate source tree into its own repository, and
529 # given us that repo's path as $git_dir.
531 # If we're running a "local" job, we might have to fetch content
532 # from a remote repository.
534 # (Currently crunch-dispatch gives a local path with --git-dir, but
535 # we might as well accept URLs there too in case it changes its
537 my $repo = $git_dir || $Job->{'repository'};
539 # Repository can be remote or local. If remote, we'll need to fetch it
540 # to a local dir before doing `git log` et al.
543 if ($repo =~ m{://|^[^/]*:}) {
544 # $repo is a git url we can clone, like git:// or https:// or
545 # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is
546 # not recognized here because distinguishing that from a local
547 # path is too fragile. If you really need something strange here,
548 # use the ssh:// form.
549 $repo_location = 'remote';
550 } elsif ($repo =~ m{^\.*/}) {
551 # $repo is a local path to a git index. We'll also resolve ../foo
552 # to ../foo/.git if the latter is a directory. To help
553 # disambiguate local paths from named hosted repositories, this
554 # form must be given as ./ or ../ if it's a relative path.
555 if (-d "$repo/.git") {
556 $repo = "$repo/.git";
558 $repo_location = 'local';
560 # $repo is none of the above. It must be the name of a hosted
562 my $arv_repo_list = api_call("repositories/list",
563 'filters' => [['name','=',$repo]]);
564 my @repos_found = @{$arv_repo_list->{'items'}};
565 my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
567 Log(undef, "Repository '$repo' -> "
568 . join(", ", map { $_->{'uuid'} } @repos_found));
571 croak("Error: Found $n_found repositories with name '$repo'.");
573 $repo = $repos_found[0]->{'fetch_url'};
574 $repo_location = 'remote';
576 Log(undef, "Using $repo_location repository '$repo'");
577 $ENV{"CRUNCH_SRC_URL"} = $repo;
579 # Resolve given script_version (we'll call that $treeish here) to a
580 # commit sha1 ($commit).
581 my $treeish = $Job->{'script_version'};
583 if ($repo_location eq 'remote') {
584 # We minimize excess object-fetching by re-using the same bare
585 # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we
586 # just keep adding remotes to it as needed.
587 my $local_repo = $ENV{'CRUNCH_TMP'}."/.git";
588 my $gitcmd = "git --git-dir=\Q$local_repo\E";
590 # Set up our local repo for caching remote objects, making
592 if (!-d $local_repo) {
593 make_path($local_repo) or croak("Error: could not create $local_repo");
595 # This works (exits 0 and doesn't delete fetched objects) even
596 # if $local_repo is already initialized:
597 `$gitcmd init --bare`;
599 croak("Error: $gitcmd init --bare exited ".exit_status_s($?));
602 # If $treeish looks like a hash (or abbrev hash) we look it up in
603 # our local cache first, since that's cheaper. (We don't want to
604 # do that with tags/branches though -- those change over time, so
605 # they should always be resolved by the remote repo.)
606 if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
607 # Hide stderr because it's normal for this to fail:
608 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
610 # Careful not to resolve a branch named abcdeff to commit 1234567:
611 $sha1 =~ /^$treeish/ &&
612 $sha1 =~ /^([0-9a-f]{40})$/s) {
614 Log(undef, "Commit $commit already present in $local_repo");
618 if (!defined $commit) {
619 # If $treeish isn't just a hash or abbrev hash, or isn't here
620 # yet, we need to fetch the remote to resolve it correctly.
622 # First, remove all local heads. This prevents a name that does
623 # not exist on the remote from resolving to (or colliding with)
624 # a previously fetched branch or tag (possibly from a different
626 remove_tree("$local_repo/refs/heads", {keep_root => 1});
628 Log(undef, "Fetching objects from $repo to $local_repo");
629 `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`;
631 croak("Error: `$gitcmd fetch` exited ".exit_status_s($?));
635 # Now that the data is all here, we will use our local repo for
636 # the rest of our git activities.
640 my $gitcmd = "git --git-dir=\Q$repo\E";
641 my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`;
642 unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
643 croak("`$gitcmd rev-list` exited "
645 .", '$treeish' not found, giving up");
648 Log(undef, "Version $treeish is commit $commit");
650 if ($commit ne $Job->{'script_version'}) {
651 # Record the real commit id in the database, frozentokey, logs,
652 # etc. -- instead of an abbreviation or a branch name which can
653 # become ambiguous or point to a different commit in the future.
654 if (!$Job->update_attributes('script_version' => $commit)) {
655 croak("Error: failed to update job's script_version attribute");
659 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
660 add_git_archive("$gitcmd archive ''\Q$commit\E");
663 my $git_archive = combined_git_archive();
664 if (!defined $git_archive) {
665 Log(undef, "Skip install phase (no git archive)");
667 Log(undef, "Warning: This probably means workers have no source tree!");
672 my $install_script_tries_left = 3;
673 for (my $attempts = 0; $attempts < 3; $attempts++) {
674 my @srunargs = ("srun",
675 "--nodelist=$nodelist",
676 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
677 my @execargs = ("sh", "-c",
678 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
680 $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
681 my ($stdout, $stderr);
682 ($exited, $stdout, $stderr) = srun_sync(
683 \@srunargs, \@execargs,
684 {label => "run install script on all workers"},
685 $build_script . $git_archive);
687 my $stderr_anything_from_script = 0;
688 for my $line (split(/\n/, $stderr)) {
689 if ($line !~ /^(srun: error: |starting: \[)/) {
690 $stderr_anything_from_script = 1;
694 last if $exited == 0 || $main::please_freeze;
696 # If the install script fails but doesn't print an error message,
697 # the next thing anyone is likely to do is just run it again in
698 # case it was a transient problem like "slurm communication fails
699 # because the network isn't reliable enough". So we'll just do
700 # that ourselves (up to 3 attempts in total). OTOH, if there is an
701 # error message, the problem is more likely to have a real fix and
702 # we should fail the job so the fixing process can start, instead
703 # of doing 2 more attempts.
704 last if $stderr_anything_from_script;
707 foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
708 unlink($tar_filename);
716 foreach (qw (script script_version script_parameters runtime_constraints))
720 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
722 foreach (split (/\n/, $Job->{knobs}))
724 Log (undef, "knob " . $_);
728 'filters' => [['hostname', 'in', \@node]],
729 'order' => 'hostname',
730 'limit' => scalar(@node),
732 for my $n (@{$resp->{items}}) {
733 Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
738 $main::success = undef;
744 my $thisround_succeeded = 0;
745 my $thisround_failed = 0;
746 my $thisround_failed_multiple = 0;
747 my $working_slot_count = scalar(@slot);
749 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
750 or $a <=> $b } @jobstep_todo;
751 my $level = $jobstep[$jobstep_todo[0]]->{level};
753 my $initial_tasks_this_level = 0;
754 foreach my $id (@jobstep_todo) {
755 $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
758 # If the number of tasks scheduled at this level #T is smaller than the number
759 # of slots available #S, only use the first #T slots, or the first slot on
760 # each node, whichever number is greater.
762 # When we dispatch tasks later, we'll allocate whole-node resources like RAM
763 # based on these numbers. Using fewer slots makes more resources available
764 # to each individual task, which should normally be a better strategy when
765 # there are fewer of them running with less parallelism.
767 # Note that this calculation is not redone if the initial tasks at
768 # this level queue more tasks at the same level. This may harm
769 # overall task throughput for that level.
771 if ($initial_tasks_this_level < @node) {
772 @freeslot = (0..$#node);
773 } elsif ($initial_tasks_this_level < @slot) {
774 @freeslot = (0..$initial_tasks_this_level - 1);
776 @freeslot = (0..$#slot);
778 my $round_num_freeslots = scalar(@freeslot);
779 print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
781 my %round_max_slots = ();
782 for (my $ii = $#freeslot; $ii >= 0; $ii--) {
783 my $this_slot = $slot[$freeslot[$ii]];
784 my $node_name = $this_slot->{node}->{name};
785 $round_max_slots{$node_name} ||= $this_slot->{cpu};
786 last if (scalar(keys(%round_max_slots)) >= @node);
789 Log(undef, "start level $level with $round_num_freeslots slots");
792 my $progress_is_dirty = 1;
793 my $progress_stats_updated = 0;
795 update_progress_stats();
799 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
801 # Don't create new tasks if we already know the job's final result.
802 last if defined($main::success);
804 my $id = $jobstep_todo[$todo_ptr];
805 my $Jobstep = $jobstep[$id];
806 if ($Jobstep->{level} != $level)
811 pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
812 set_nonblocking($reader{$id});
814 my $childslot = $freeslot[0];
815 my $childnode = $slot[$childslot]->{node};
816 my $childslotname = join (".",
817 $slot[$childslot]->{node}->{name},
818 $slot[$childslot]->{cpu});
820 my $childpid = fork();
823 $SIG{'INT'} = 'DEFAULT';
824 $SIG{'QUIT'} = 'DEFAULT';
825 $SIG{'TERM'} = 'DEFAULT';
827 foreach (values (%reader))
831 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
832 open(STDOUT,">&writer");
833 open(STDERR,">&writer");
838 delete $ENV{"GNUPGHOME"};
839 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
840 $ENV{"TASK_QSEQUENCE"} = $id;
841 $ENV{"TASK_SEQUENCE"} = $level;
842 $ENV{"JOB_SCRIPT"} = $Job->{script};
843 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
844 $param =~ tr/a-z/A-Z/;
845 $ENV{"JOB_PARAMETER_$param"} = $value;
847 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
848 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
849 $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
850 $ENV{"HOME"} = $ENV{"TASK_WORK"};
851 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
852 $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
853 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
855 my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
861 "--nodelist=".$childnode->{name},
862 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
863 "--job-name=$job_id.$id.$$",
866 my $stdbuf = " stdbuf --output=0 --error=0 ";
868 my $arv_file_cache = "";
869 if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
870 $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
874 "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
875 ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
876 ."&& cd \Q$ENV{CRUNCH_TMP}\E "
877 # These environment variables get used explicitly later in
878 # $command. No tool is expected to read these values directly.
879 .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
880 .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
881 ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
882 ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
883 .q{&& declare -a VOLUMES=() }
884 .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
885 .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
886 .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
888 $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
889 $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
890 $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
894 my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
895 my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
896 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
897 $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
898 # We only set memory limits if Docker lets us limit both memory and swap.
899 # Memory limits alone have been supported longer, but subprocesses tend
900 # to get SIGKILL if they exceed that without any swap limit set.
901 # See #5642 for additional background.
902 if ($docker_limitmem) {
903 $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
906 # The source tree and $destdir directory (which we have
907 # installed on the worker host) are available in the container,
908 # under the same path.
909 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
910 $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
912 # Currently, we make the "by_pdh" directory in arv-mount's mount
913 # point appear at /keep inside the container (instead of using
914 # the same path as the host like we do with CRUNCH_SRC and
915 # CRUNCH_INSTALL). However, crunch scripts and utilities must
916 # not rely on this. They must use $TASK_KEEPMOUNT.
917 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
918 $ENV{TASK_KEEPMOUNT} = "/keep";
920 # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
921 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
922 $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
924 # TASK_WORK is almost exactly like a docker data volume: it
925 # starts out empty, is writable, and persists until no
926 # containers use it any more. We don't use --volumes-from to
927 # share it with other containers: it is only accessible to this
928 # task, and it goes away when this task stops.
930 # However, a docker data volume is writable only by root unless
931 # the mount point already happens to exist in the container with
932 # different permissions. Therefore, we [1] assume /tmp already
933 # exists in the image and is writable by the crunch user; [2]
934 # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
935 # writable if they are created by docker while setting up the
936 # other --volumes); and [3] create $TASK_WORK inside the
937 # container using $build_script.
938 $command .= "--volume=/tmp ";
939 $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
940 $ENV{"HOME"} = $ENV{"TASK_WORK"};
941 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
943 # TODO: Share a single JOB_WORK volume across all task
944 # containers on a given worker node, and delete it when the job
945 # ends (and, in case that doesn't work, when the next job
948 # For now, use the same approach as TASK_WORK above.
949 $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
951 # Bind mount the crunchrunner binary and host TLS certificates file into
953 $command .= '"${VOLUMES[@]}" ';
955 while (my ($env_key, $env_val) = each %ENV)
957 if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
958 $command .= "--env=\Q$env_key=$env_val\E ";
961 $command .= "--env=\QHOME=$ENV{HOME}\E ";
962 $command .= "\Q$docker_hash\E ";
964 if ($Job->{arvados_sdk_version}) {
966 $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
968 $command .= "/bin/sh -c \'python -c " .
969 '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
970 ">&2 2>/dev/null; " .
971 "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
972 "if which stdbuf >/dev/null ; then " .
973 " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
975 " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
980 $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
982 $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
985 my @execargs = ('bash', '-c', $command);
986 srun (\@srunargs, \@execargs, undef, $build_script);
987 # exec() failed, we assume nothing happened.
988 die "srun() failed on build script\n";
991 if (!defined $childpid)
1002 jobstepname => "$job_id.$id.$childpid",
1004 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
1005 $slot[$childslot]->{pid} = $childpid;
1007 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
1008 Log ($id, "child $childpid started on $childslotname");
1009 $Jobstep->{starttime} = time;
1010 $Jobstep->{node} = $childnode->{name};
1011 $Jobstep->{slotindex} = $childslot;
1012 delete $Jobstep->{stderr};
1013 delete $Jobstep->{finishtime};
1014 delete $Jobstep->{tempfail};
1016 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
1017 $Jobstep->{'arvados_task'}->save;
1019 splice @jobstep_todo, $todo_ptr, 1;
1022 $progress_is_dirty = 1;
1026 ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
1028 last THISROUND if $main::please_freeze;
1029 if ($main::please_info)
1031 $main::please_info = 0;
1033 create_output_collection();
1035 update_progress_stats();
1040 if (!$gotsome || ($latest_refresh + 2 < scalar time))
1042 check_refresh_wanted();
1044 update_progress_stats();
1046 elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
1048 update_progress_stats();
1051 select (undef, undef, undef, 0.1);
1053 $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
1054 $_->{node}->{hold_count} < 4 } @slot);
1055 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
1056 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
1058 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
1059 .($thisround_failed+$thisround_succeeded)
1060 .") -- giving up on this round";
1061 Log (undef, $message);
1065 # move slots from freeslot to holdslot (or back to freeslot) if necessary
1066 for (my $i=$#freeslot; $i>=0; $i--) {
1067 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
1068 push @holdslot, (splice @freeslot, $i, 1);
1071 for (my $i=$#holdslot; $i>=0; $i--) {
1072 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
1073 push @freeslot, (splice @holdslot, $i, 1);
1077 # give up if no nodes are succeeding
1078 if ($working_slot_count < 1) {
1079 Log(undef, "Every node has failed -- giving up");
1086 push @freeslot, splice @holdslot;
1087 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
1090 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
1093 if ($main::please_continue) {
1094 $main::please_continue = 0;
1097 $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
1099 if (!reapchildren())
1101 check_refresh_wanted();
1103 update_progress_stats();
1104 select (undef, undef, undef, 0.1);
1105 killem (keys %proc) if $main::please_freeze;
1109 update_progress_stats();
1110 freeze_if_want_freeze();
1113 if (!defined $main::success)
1115 if (!@jobstep_todo) {
1117 } elsif ($working_slot_count < 1) {
1118 save_output_collection();
1120 exit(EX_RETRY_UNLOCKED);
1121 } elsif ($thisround_succeeded == 0 &&
1122 ($thisround_failed == 0 || $thisround_failed > 4)) {
1123 my $message = "stop because $thisround_failed tasks failed and none succeeded";
1124 Log (undef, $message);
1129 goto ONELEVEL if !defined $main::success;
1132 release_allocation();
1134 my $collated_output = save_output_collection();
1135 Log (undef, "finish");
1137 my $final_log = save_meta();
1140 if ($collated_output && $final_log && $main::success) {
1141 $final_state = 'Complete';
1143 $final_state = 'Failed';
1145 $Job->update_attributes('state' => $final_state);
1147 exit (($final_state eq 'Complete') ? 0 : 1);
1151 sub update_progress_stats
1153 $progress_stats_updated = time;
1154 return if !$progress_is_dirty;
1155 my ($todo, $done, $running) = (scalar @jobstep_todo,
1156 scalar @jobstep_done,
1157 scalar keys(%proc));
1158 $Job->{'tasks_summary'} ||= {};
1159 $Job->{'tasks_summary'}->{'todo'} = $todo;
1160 $Job->{'tasks_summary'}->{'done'} = $done;
1161 $Job->{'tasks_summary'}->{'running'} = $running;
1162 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
1163 Log (undef, "status: $done done, $running running, $todo todo");
1164 $progress_is_dirty = 0;
1171 my $children_reaped = 0;
1172 my @successful_task_uuids = ();
1174 while((my $pid = waitpid (-1, WNOHANG)) > 0)
1176 my $childstatus = $?;
1178 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
1180 . $slot[$proc{$pid}->{slot}]->{cpu});
1181 my $jobstepidx = $proc{$pid}->{jobstepidx};
1184 my $elapsed = time - $proc{$pid}->{time};
1185 my $Jobstep = $jobstep[$jobstepidx];
1187 my $exitvalue = $childstatus >> 8;
1188 my $exitinfo = "exit ".exit_status_s($childstatus);
1189 $Jobstep->{'arvados_task'}->reload;
1190 my $task_success = $Jobstep->{'arvados_task'}->{success};
1192 Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
1194 if (!defined $task_success) {
1195 # task did not indicate one way or the other --> fail
1196 Log($jobstepidx, sprintf(
1197 "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
1198 exit_status_s($childstatus)));
1199 $Jobstep->{'arvados_task'}->{success} = 0;
1200 $Jobstep->{'arvados_task'}->save;
1207 $temporary_fail ||= $Jobstep->{tempfail};
1208 $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
1210 ++$thisround_failed;
1211 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
1213 # Check for signs of a failed or misconfigured node
1214 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
1215 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
1216 # Don't count this against jobstep failure thresholds if this
1217 # node is already suspected faulty and srun exited quickly
1218 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
1220 Log ($jobstepidx, "blaming failure on suspect node " .
1221 $slot[$proc{$pid}->{slot}]->{node}->{name});
1222 $temporary_fail ||= 1;
1224 ban_node_by_slot($proc{$pid}->{slot});
1227 Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
1228 ++$Jobstep->{'failures'},
1229 $temporary_fail ? 'temporary' : 'permanent',
1232 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
1233 # Give up on this task, and the whole job
1236 # Put this task back on the todo queue
1237 push @jobstep_todo, $jobstepidx;
1238 $Job->{'tasks_summary'}->{'failed'}++;
1242 push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
1243 ++$thisround_succeeded;
1244 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
1245 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
1246 $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
1247 push @jobstep_done, $jobstepidx;
1248 Log ($jobstepidx, "success in $elapsed seconds");
1250 $Jobstep->{exitcode} = $childstatus;
1251 $Jobstep->{finishtime} = time;
1252 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
1253 $Jobstep->{'arvados_task'}->save;
1254 process_stderr_final ($jobstepidx);
1255 Log ($jobstepidx, sprintf("task output (%d bytes): %s",
1256 length($Jobstep->{'arvados_task'}->{output}),
1257 $Jobstep->{'arvados_task'}->{output}));
1259 close $reader{$jobstepidx};
1260 delete $reader{$jobstepidx};
1261 delete $slot[$proc{$pid}->{slot}]->{pid};
1262 push @freeslot, $proc{$pid}->{slot};
1265 $progress_is_dirty = 1;
1268 if (scalar(@successful_task_uuids) > 0)
1270 Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
1272 my $newtask_list = [];
1273 my $newtask_results;
1275 $newtask_results = api_call(
1277 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
1278 'order' => 'qsequence',
1279 'offset' => scalar(@$newtask_list),
1281 push(@$newtask_list, @{$newtask_results->{items}});
1282 } while (@{$newtask_results->{items}});
1283 Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
1284 foreach my $arvados_task (@$newtask_list) {
1286 'level' => $arvados_task->{'sequence'},
1288 'arvados_task' => $arvados_task
1290 push @jobstep, $jobstep;
1291 push @jobstep_todo, $#jobstep;
1295 return $children_reaped;
1298 sub check_refresh_wanted
1300 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1302 $stat[9] > $latest_refresh &&
1303 # ...and we have actually locked the job record...
1304 $job_id eq $Job->{'uuid'}) {
1305 $latest_refresh = scalar time;
1306 my $Job2 = api_call("jobs/get", uuid => $jobspec);
1307 for my $attr ('cancelled_at',
1308 'cancelled_by_user_uuid',
1309 'cancelled_by_client_uuid',
1311 $Job->{$attr} = $Job2->{$attr};
1313 if ($Job->{'state'} ne "Running") {
1314 if ($Job->{'state'} eq "Cancelled") {
1315 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1317 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1320 $main::please_freeze = 1;
1327 my $last_squeue_check = $squeue_checked;
1329 # Do not call `squeue` or check the kill list more than once every
1331 return if $last_squeue_check > time - 15;
1332 $squeue_checked = time;
1334 # Look for children from which we haven't received stderr data since
1335 # the last squeue check. If no such children exist, all procs are
1336 # alive and there's no need to even look at squeue.
1338 # As long as the crunchstat poll interval (10s) is shorter than the
1339 # squeue check interval (15s) this should make the squeue check an
1341 my $silent_procs = 0;
1342 for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
1344 if (!exists($js->{stderr_at}))
1346 $js->{stderr_at} = 0;
1348 if ($js->{stderr_at} < $last_squeue_check)
1353 return if $silent_procs == 0;
1355 # use killem() on procs whose killtime is reached
1356 while (my ($pid, $procinfo) = each %proc)
1358 my $js = $jobstep[$procinfo->{jobstepidx}];
1359 if (exists $procinfo->{killtime}
1360 && $procinfo->{killtime} <= time
1361 && $js->{stderr_at} < $last_squeue_check)
1364 if ($js->{stderr_at}) {
1365 $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
1367 Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
1374 # here is an opportunity to check for mysterious problems with local procs
1378 # Get a list of steps still running. Note: squeue(1) says --steps
1379 # selects a format (which we override anyway) and allows us to
1380 # specify which steps we're interested in (which we don't).
1381 # Importantly, it also changes the meaning of %j from "job name" to
1382 # "step name" and (although this isn't mentioned explicitly in the
1383 # docs) switches from "one line per job" mode to "one line per step"
1384 # mode. Without it, we'd just get a list of one job, instead of a
1386 my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
1389 Log(undef, "warning: squeue exit status $? ($!)");
1394 # which of my jobsteps are running, according to squeue?
1396 for my $jobstepname (@squeue)
1398 $ok{$jobstepname} = 1;
1401 # Check for child procs >60s old and not mentioned by squeue.
1402 while (my ($pid, $procinfo) = each %proc)
1404 if ($procinfo->{time} < time - 60
1405 && $procinfo->{jobstepname}
1406 && !exists $ok{$procinfo->{jobstepname}}
1407 && !exists $procinfo->{killtime})
1409 # According to slurm, this task has ended (successfully or not)
1410 # -- but our srun child hasn't exited. First we must wait (30
1411 # seconds) in case this is just a race between communication
1412 # channels. Then, if our srun child process still hasn't
1413 # terminated, we'll conclude some slurm communication
1414 # error/delay has caused the task to die without notifying srun,
1415 # and we'll kill srun ourselves.
1416 $procinfo->{killtime} = time + 30;
1417 Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
1424 # If a node fails in a multi-node "srun" call during job setup, the call
1425 # may hang instead of exiting with a nonzero code. This function checks
1426 # "sinfo" for the health of the nodes that were allocated and ensures that
1427 # they are all still in the "alloc" state. If a node that is allocated to
1428 # this job is not in "alloc" state, then set please_freeze.
1430 # This is only called from srun_sync() for node configuration. If a
1431 # node fails doing actual work, there are other recovery mechanisms.
1433 # Do not call `sinfo` more than once every 15 seconds.
1434 return if $sinfo_checked > time - 15;
1435 $sinfo_checked = time;
1437 # The output format "%t" means output node states.
1438 my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
1441 Log(undef, "warning: sinfo exit status $? ($!)");
1448 if ($_ != "alloc" && $_ != "alloc*") {
1449 $main::please_freeze = 1;
1454 sub release_allocation
1458 Log (undef, "release job allocation");
1459 system "scancel $ENV{SLURM_JOB_ID}";
1468 my $sel = IO::Select->new();
1469 foreach my $jobstepidx (keys %reader)
1471 my $fd = $reader{$jobstepidx};
1473 $fd_job{$fd} = $jobstepidx;
1475 if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
1476 $sel->add($stdout_fd);
1477 $fd_job{$stdout_fd} = $jobstepidx;
1480 # select on all reader fds with 0.1s timeout
1481 my @ready_fds = $sel->can_read(0.1);
1482 foreach my $fd (@ready_fds)
1485 if (0 < sysread ($fd, $buf, 65536))
1488 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1490 my $jobstepidx = $fd_job{$fd};
1491 if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
1492 $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
1496 $jobstep[$jobstepidx]->{stderr_at} = time;
1497 $jobstep[$jobstepidx]->{stderr} .= $buf;
1499 # Consume everything up to the last \n
1500 preprocess_stderr ($jobstepidx);
1502 if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
1504 # If we get a lot of stderr without a newline, chop off the
1505 # front to avoid letting our buffer grow indefinitely.
1506 substr ($jobstep[$jobstepidx]->{stderr},
1507 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
1515 # Consume all full lines of stderr for a jobstep. Everything after the
1516 # last newline will remain in $jobstep[$jobstepidx]->{stderr} after
1518 sub preprocess_stderr
1520 my $jobstepidx = shift;
1521 # slotindex is only defined for children running Arvados job tasks.
1522 # Be prepared to handle the undef case (for setup srun calls, etc.).
1523 my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
1525 while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
1527 substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
1528 Log ($jobstepidx, "stderr $line");
1529 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
1530 # If the allocation is revoked, we can't possibly continue, so mark all
1531 # nodes as failed. This will cause the overall exit code to be
1532 # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
1534 $main::please_freeze = 1;
1535 foreach my $st (@slot) {
1536 $st->{node}->{fail_count}++;
1539 elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
1540 $jobstep[$jobstepidx]->{tempfail} = 1;
1541 if (defined($job_slot_index)) {
1542 $slot[$job_slot_index]->{node}->{fail_count}++;
1543 ban_node_by_slot($job_slot_index);
1546 elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
1547 $jobstep[$jobstepidx]->{tempfail} = 1;
1548 ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
1550 elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
1551 $jobstep[$jobstepidx]->{tempfail} = 1;
1557 sub process_stderr_final
1559 my $jobstepidx = shift;
1560 preprocess_stderr ($jobstepidx);
1563 Log ($jobstepidx, "stderr $_");
1564 } split ("\n", $jobstep[$jobstepidx]->{stderr});
1565 $jobstep[$jobstepidx]->{stderr} = '';
1572 if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
1573 Log(undef, "fetch_block run error from arv-get $hash: $!");
1576 my $output_block = "";
1579 my $bytes = sysread($keep, $buf, 1024 * 1024);
1580 if (!defined $bytes) {
1581 Log(undef, "fetch_block read error from arv-get: $!");
1582 $output_block = undef;
1584 } elsif ($bytes == 0) {
1585 # sysread returns 0 at the end of the pipe.
1588 # some bytes were read into buf.
1589 $output_block .= $buf;
1594 Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
1595 $output_block = undef;
1597 return $output_block;
1600 # Create a collection by concatenating the output of all tasks (each
1601 # task's output is either a manifest fragment, a locator for a
1602 # manifest fragment stored in Keep, or nothing at all). Return the
1603 # portable_data_hash of the new collection.
1604 sub create_output_collection
1606 Log (undef, "collate");
1608 my ($child_out, $child_in);
1609 my $pid = open2($child_out, $child_in, 'python', '-c', q{
1612 print (arvados.api("v1").collections().
1613 create(body={"manifest_text": sys.stdin.read(),
1614 "owner_uuid": sys.argv[2]}).
1615 execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
1616 }, retry_count(), $Job->{owner_uuid});
1619 my $manifest_size = 0;
1623 my $output = $_->{'arvados_task'}->{output};
1624 next if (!defined($output));
1626 if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
1627 $next_write = fetch_block($output);
1629 $next_write = $output;
1631 if (defined($next_write)) {
1632 if (!defined(syswrite($child_in, $next_write))) {
1633 # There's been an error writing. Stop the loop.
1634 # We'll log details about the exit code later.
1637 $manifest_size += length($next_write);
1640 my $uuid = $_->{'arvados_task'}->{'uuid'};
1641 Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
1646 Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
1649 my $s = IO::Select->new($child_out);
1650 if ($s->can_read(120)) {
1651 sysread($child_out, $joboutput, 1024 * 1024);
1654 Log(undef, "output collection creation exited " . exit_status_s($?));
1660 Log (undef, "timed out while creating output collection");
1661 foreach my $signal (2, 2, 2, 15, 15, 9) {
1662 kill($signal, $pid);
1663 last if waitpid($pid, WNOHANG) == -1;
1672 # Calls create_output_collection, logs the result, and returns it.
1673 # If that was successful, save that as the output in the job record.
1674 sub save_output_collection {
1675 my $collated_output = create_output_collection();
1677 if (!$collated_output) {
1678 Log(undef, "Failed to write output collection");
1681 Log(undef, "job output $collated_output");
1682 $Job->update_attributes('output' => $collated_output);
1684 return $collated_output;
1691 my $sig = 2; # SIGINT first
1692 if (exists $proc{$_}->{"sent_$sig"} &&
1693 time - $proc{$_}->{"sent_$sig"} > 4)
1695 $sig = 15; # SIGTERM if SIGINT doesn't work
1697 if (exists $proc{$_}->{"sent_$sig"} &&
1698 time - $proc{$_}->{"sent_$sig"} > 4)
1700 $sig = 9; # SIGKILL if SIGTERM doesn't work
1702 if (!exists $proc{$_}->{"sent_$sig"})
1704 Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
1706 select (undef, undef, undef, 0.1);
1709 kill $sig, $_; # srun wants two SIGINT to really interrupt
1711 $proc{$_}->{"sent_$sig"} = time;
1712 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1722 vec($bits,fileno($_),1) = 1;
1728 # Send log output to Keep via arv-put.
1730 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1731 # $log_pipe_out_buf is a string containing all output read from arv-put so far.
1732 # $log_pipe_out_select is an IO::Select object around $log_pipe_out.
1733 # $log_pipe_pid is the pid of the arv-put subprocess.
1735 # The only functions that should access these variables directly are:
1737 # log_writer_start($logfilename)
1738 # Starts an arv-put pipe, reading data on stdin and writing it to
1739 # a $logfilename file in an output collection.
1741 # log_writer_read_output([$timeout])
1742 # Read output from $log_pipe_out and append it to $log_pipe_out_buf.
1743 # Passes $timeout to the select() call, with a default of 0.01.
1744 # Returns the result of the last read() call on $log_pipe_out, or
1745 # -1 if read() wasn't called because select() timed out.
1746 # Only other log_writer_* functions should need to call this.
1748 # log_writer_send($txt)
1749 # Writes $txt to the output log collection.
1751 # log_writer_finish()
1752 # Closes the arv-put pipe and returns the output that it produces.
1754 # log_writer_is_active()
1755 # Returns a true value if there is currently a live arv-put
1756 # process, false otherwise.
1758 my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
1761 sub log_writer_start($)
1763 my $logfilename = shift;
1764 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1768 '--filename', $logfilename,
1770 $log_pipe_out_buf = "";
1771 $log_pipe_out_select = IO::Select->new($log_pipe_out);
1774 sub log_writer_read_output {
1775 my $timeout = shift || 0.01;
1777 while ($read && $log_pipe_out_select->can_read($timeout)) {
1778 $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
1779 length($log_pipe_out_buf));
1781 if (!defined($read)) {
1782 Log(undef, "error reading log manifest from arv-put: $!");
1787 sub log_writer_send($)
1790 print $log_pipe_in $txt;
1791 log_writer_read_output();
1794 sub log_writer_finish()
1796 return unless $log_pipe_pid;
1798 close($log_pipe_in);
1800 my $logger_failed = 0;
1801 my $read_result = log_writer_read_output(600);
1802 if ($read_result == -1) {
1803 $logger_failed = -1;
1804 Log (undef, "timed out reading from 'arv-put'");
1805 } elsif ($read_result != 0) {
1806 $logger_failed = -2;
1807 Log(undef, "failed to read arv-put log manifest to EOF");
1810 waitpid($log_pipe_pid, 0);
1812 $logger_failed ||= $?;
1813 Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
1816 close($log_pipe_out);
1817 my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
1818 $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
1819 $log_pipe_out_select = undef;
1821 return $arv_put_output;
1824 sub log_writer_is_active() {
1825 return $log_pipe_pid;
1828 sub Log # ($jobstepidx, $logmessage)
1830 my ($jobstepidx, $logmessage) = @_;
1831 if ($logmessage =~ /\n/) {
1832 for my $line (split (/\n/, $_[1])) {
1833 Log ($jobstepidx, $line);
1837 my $fh = select STDERR; $|=1; select $fh;
1839 if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
1840 $task_qseq = $jobstepidx;
1842 my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
1843 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1846 if (log_writer_is_active() || -t STDERR) {
1847 my @gmtime = gmtime;
1848 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1849 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1851 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1853 if (log_writer_is_active()) {
1854 log_writer_send($datetime . " " . $message);
1861 my ($package, $file, $line) = caller;
1862 my $message = "@_ at $file line $line\n";
1863 Log (undef, $message);
1864 release_allocation();
1865 freeze() if @jobstep_todo;
1866 create_output_collection() if @jobstep_todo;
1876 if ($Job->{'state'} eq 'Cancelled') {
1877 $Job->update_attributes('finished_at' => scalar gmtime);
1879 $Job->update_attributes('state' => 'Failed');
1886 my $justcheckpoint = shift; # false if this will be the last meta saved
1887 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1888 return unless log_writer_is_active();
1889 my $log_manifest = log_writer_finish();
1890 return unless defined($log_manifest);
1893 my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
1894 $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
1897 my $log_coll = api_call(
1898 "collections/create", ensure_unique_name => 1, collection => {
1899 manifest_text => $log_manifest,
1900 owner_uuid => $Job->{owner_uuid},
1901 name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
1903 Log(undef, "log collection is " . $log_coll->{portable_data_hash});
1904 $Job->update_attributes('log' => $log_coll->{portable_data_hash});
1906 return $log_coll->{portable_data_hash};
1910 sub freeze_if_want_freeze
1912 if ($main::please_freeze)
1914 release_allocation();
1917 # kill some srun procs before freeze+stop
1918 map { $proc{$_} = {} } @_;
1921 killem (keys %proc);
1922 select (undef, undef, undef, 0.1);
1924 while (($died = waitpid (-1, WNOHANG)) > 0)
1926 delete $proc{$died};
1931 create_output_collection();
1941 Log (undef, "Freeze not implemented");
1948 croak ("Thaw not implemented");
1964 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1970 my $srunargs = shift;
1971 my $execargs = shift;
1972 my $opts = shift || {};
1975 my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
1976 Log (undef, "$label: start");
1978 my ($stderr_r, $stderr_w);
1979 pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
1981 my ($stdout_r, $stdout_w);
1982 pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
1984 my $srunpid = fork();
1989 fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
1990 fcntl($stdout_w, F_SETFL, 0) or croak($!);
1991 open(STDERR, ">&", $stderr_w);
1992 open(STDOUT, ">&", $stdout_w);
1993 srun ($srunargs, $execargs, $opts, $stdin);
1999 set_nonblocking($stderr_r);
2000 set_nonblocking($stdout_r);
2002 # Add entries to @jobstep and %proc so check_squeue() and
2003 # freeze_if_want_freeze() can treat it like a job task process.
2007 stderr_captured => '',
2008 stdout_r => $stdout_r,
2009 stdout_captured => '',
2011 my $jobstepidx = $#jobstep;
2013 jobstepidx => $jobstepidx,
2015 $reader{$jobstepidx} = $stderr_r;
2017 while ($srunpid != waitpid ($srunpid, WNOHANG)) {
2018 my $busy = readfrompipes();
2019 if (!$busy || ($latest_refresh + 2 < scalar time)) {
2020 check_refresh_wanted();
2025 select(undef, undef, undef, 0.1);
2027 killem(keys %proc) if $main::please_freeze;
2031 1 while readfrompipes();
2032 process_stderr_final ($jobstepidx);
2034 Log (undef, "$label: exit ".exit_status_s($exited));
2038 delete $proc{$srunpid};
2039 delete $reader{$jobstepidx};
2041 my $j = pop @jobstep;
2042 # If the srun showed signs of tempfail, ensure the caller treats that as a
2044 if ($main::please_freeze || $j->{tempfail}) {
2047 return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
2053 my $srunargs = shift;
2054 my $execargs = shift;
2055 my $opts = shift || {};
2057 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
2059 $Data::Dumper::Terse = 1;
2060 $Data::Dumper::Indent = 0;
2061 my $show_cmd = Dumper($args);
2062 $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
2063 $show_cmd =~ s/\n/ /g;
2064 if ($opts->{fork}) {
2065 Log(undef, "starting: $show_cmd");
2067 # This is a child process: parent is in charge of reading our
2068 # stderr and copying it to Log() if needed.
2069 warn "starting: $show_cmd\n";
2072 if (defined $stdin) {
2073 my $child = open STDIN, "-|";
2074 defined $child or die "no fork: $!";
2076 print $stdin or die $!;
2077 close STDOUT or die $!;
2082 return system (@$args) if $opts->{fork};
2085 warn "ENV size is ".length(join(" ",%ENV));
2086 die "exec failed: $!: @$args";
2090 sub ban_node_by_slot {
2091 # Don't start any new jobsteps on this node for 60 seconds
2093 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
2094 $slot[$slotid]->{node}->{hold_count}++;
2095 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
2100 my ($lockfile, $error_message) = @_;
2101 open L, ">", $lockfile or croak("$lockfile: $!");
2102 if (!flock L, LOCK_EX|LOCK_NB) {
2103 croak("Can't lock $lockfile: $error_message\n");
2107 sub find_docker_image {
2108 # Given a Keep locator, check to see if it contains a Docker image.
2109 # If so, return its stream name and Docker hash.
2110 # If not, return undef for both values.
2111 my $locator = shift;
2112 my ($streamname, $filename);
2113 my $image = api_call("collections/get", uuid => $locator);
2115 foreach my $line (split(/\n/, $image->{manifest_text})) {
2116 my @tokens = split(/\s+/, $line);
2118 $streamname = shift(@tokens);
2119 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
2120 if (defined($filename)) {
2121 return (undef, undef); # More than one file in the Collection.
2123 $filename = (split(/:/, $filedata, 3))[2];
2128 if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
2129 return ($streamname, $1);
2131 return (undef, undef);
2136 # Calculate the number of times an operation should be retried,
2137 # assuming exponential backoff, and that we're willing to retry as
2138 # long as tasks have been running. Enforce a minimum of 3 retries.
2139 my ($starttime, $endtime, $timediff, $retries);
2141 $starttime = $jobstep[0]->{starttime};
2142 $endtime = $jobstep[-1]->{finishtime};
2144 if (!defined($starttime)) {
2146 } elsif (!defined($endtime)) {
2147 $timediff = time - $starttime;
2149 $timediff = ($endtime - $starttime) - (time - $endtime);
2151 if ($timediff > 0) {
2152 $retries = int(log($timediff) / log(2));
2154 $retries = 1; # Use the minimum.
2156 return ($retries > 3) ? $retries : 3;
2160 # Pass in two function references.
2161 # This method will be called with the remaining arguments.
2162 # If it dies, retry it with exponential backoff until it succeeds,
2163 # or until the current retry_count is exhausted. After each failure
2164 # that can be retried, the second function will be called with
2165 # the current try count (0-based), next try time, and error message.
2166 my $operation = shift;
2167 my $retry_callback = shift;
2168 my $retries = retry_count();
2169 foreach my $try_count (0..$retries) {
2170 my $next_try = time + (2 ** $try_count);
2171 my $result = eval { $operation->(@_); };
2174 } elsif ($try_count < $retries) {
2175 $retry_callback->($try_count, $next_try, $@);
2176 my $sleep_time = $next_try - time;
2177 sleep($sleep_time) if ($sleep_time > 0);
2180 # Ensure the error message ends in a newline, so Perl doesn't add
2181 # retry_op's line number to it.
2187 # Pass in a /-separated API method name, and arguments for it.
2188 # This function will call that method, retrying as needed until
2189 # the current retry_count is exhausted, with a log on the first failure.
2190 my $method_name = shift;
2191 my $log_api_retry = sub {
2192 my ($try_count, $next_try_at, $errmsg) = @_;
2193 $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
2194 $errmsg =~ s/\s/ /g;
2195 $errmsg =~ s/\s+$//;
2197 if ($next_try_at < time) {
2198 $retry_msg = "Retrying.";
2200 my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
2201 $retry_msg = "Retrying at $next_try_fmt.";
2203 Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
2206 foreach my $key (split(/\//, $method_name)) {
2207 $method = $method->{$key};
2209 return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
2213 # Given a $?, return a human-readable exit code string like "0" or
2214 # "1" or "0 with signal 1" or "1 with signal 11".
2215 my $exitcode = shift;
2216 my $s = $exitcode >> 8;
2217 if ($exitcode & 0x7f) {
2218 $s .= " with signal " . ($exitcode & 0x7f);
2220 if ($exitcode & 0x80) {
2221 $s .= " with core dump";
2226 sub handle_readall {
2227 # Pass in a glob reference to a file handle.
2228 # Read all its contents and return them as a string.
2229 my $fh_glob_ref = shift;
2231 return <$fh_glob_ref>;
2234 sub tar_filename_n {
2236 return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
2239 sub add_git_archive {
2240 # Pass in a git archive command as a string or list, a la system().
2241 # This method will save its output to be included in the archive sent to the
2245 if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
2246 croak("Failed to save git archive: $!");
2248 my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
2250 waitpid($git_pid, 0);
2253 croak("Failed to save git archive: git exited " . exit_status_s($?));
2257 sub combined_git_archive {
2258 # Combine all saved tar archives into a single archive, then return its
2259 # contents in a string. Return undef if no archives have been saved.
2260 if ($git_tar_count < 1) {
2263 my $base_tar_name = tar_filename_n(1);
2264 foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
2265 my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
2266 if ($tar_exit != 0) {
2267 croak("Error preparing build archive: tar -A exited " .
2268 exit_status_s($tar_exit));
2271 if (!open(GIT_TAR, "<", $base_tar_name)) {
2272 croak("Could not open build archive: $!");
2274 my $tar_contents = handle_readall(\*GIT_TAR);
2276 return $tar_contents;
2279 sub set_nonblocking {
2281 my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
2282 fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
2288 # This is crunch-job's internal dispatch script. crunch-job running on the API
2289 # server invokes this script on individual compute nodes, or localhost if we're
2290 # running a job locally. It gets called in two modes:
2292 # * No arguments: Installation mode. Read a tar archive from the DATA
2293 # file handle; it includes the Crunch script's source code, and
2294 # maybe SDKs as well. Those should be installed in the proper
2295 # locations. This runs outside of any Docker container, so don't try to
2296 # introspect Crunch's runtime environment.
2298 # * With arguments: Crunch script run mode. This script should set up the
2299 # environment, then run the command specified in the arguments. This runs
2300 # inside any Docker container.
2303 use File::Path qw( make_path remove_tree );
2304 use POSIX qw(getcwd);
2306 use constant TASK_TEMPFAIL => 111;
2308 # Map SDK subdirectories to the path environments they belong to.
2309 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
2311 my $destdir = $ENV{"CRUNCH_SRC"};
2312 my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
2313 my $repo = $ENV{"CRUNCH_SRC_URL"};
2314 my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
2315 my $job_work = $ENV{"JOB_WORK"};
2316 my $task_work = $ENV{"TASK_WORK"};
2318 open(STDOUT_ORIG, ">&", STDOUT);
2319 open(STDERR_ORIG, ">&", STDERR);
2321 for my $dir ($destdir, $job_work, $task_work) {
2324 -e $dir or die "Failed to create temporary directory ($dir): $!";
2329 remove_tree($task_work, {keep_root => 1});
2332 ### Crunch script run mode
2334 # We want to do routine logging during task 0 only. This gives the user
2335 # the information they need, but avoids repeating the information for every
2338 if ($ENV{TASK_SEQUENCE} eq "0") {
2341 printf STDERR_ORIG "[Crunch] $msg\n", @_;
2347 my $python_src = "$install_dir/python";
2348 my $venv_dir = "$job_work/.arvados.venv";
2349 my $venv_built = -e "$venv_dir/bin/activate";
2350 if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
2351 shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
2352 "--python=python2.7", $venv_dir);
2353 shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
2355 $Log->("Built Python SDK virtualenv");
2358 my @pysdk_version_cmd = ("python", "-c",
2359 "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
2361 $Log->("Running in Python SDK virtualenv");
2362 @pysdk_version_cmd = ();
2363 my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
2364 @ARGV = ("/bin/sh", "-ec",
2365 ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
2366 } elsif (-d $python_src) {
2367 $Log->("Warning: virtualenv not found inside Docker container default " .
2368 "\$PATH. Can't install Python SDK.");
2371 if (@pysdk_version_cmd) {
2372 open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
2373 my $pysdk_version = <$pysdk_version_pipe>;
2374 close($pysdk_version_pipe);
2376 chomp($pysdk_version);
2377 $Log->("Using Arvados SDK version $pysdk_version");
2379 # A lot could've gone wrong here, but pretty much all of it means that
2380 # Python won't be able to load the Arvados SDK.
2381 $Log->("Warning: Arvados SDK not found");
2385 while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
2386 my $sdk_path = "$install_dir/$sdk_dir";
2388 if ($ENV{$sdk_envkey}) {
2389 $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
2391 $ENV{$sdk_envkey} = $sdk_path;
2393 $Log->("Arvados SDK added to %s", $sdk_envkey);
2398 die "Cannot exec `@ARGV`: $!";
2401 ### Installation mode
2402 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
2404 if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
2405 # This exact git archive (source + arvados sdk) is already installed
2406 # here, so there's no need to reinstall it.
2408 # We must consume our DATA section, though: otherwise the process
2409 # feeding it to us will get SIGPIPE.
2411 while (read(DATA, $buf, 65536)) { }
2416 unlink "$destdir.archive_hash";
2420 # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
2421 local $SIG{PIPE} = "IGNORE";
2422 warn "Extracting archive: $archive_hash\n";
2423 # --ignore-zeros is necessary sometimes: depending on how much NUL
2424 # padding tar -A put on our combined archive (which in turn depends
2425 # on the length of the component archives) tar without
2426 # --ignore-zeros will exit before consuming stdin and cause close()
2427 # to fail on the resulting SIGPIPE.
2428 if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
2429 die "Error launching 'tar -xC $destdir': $!";
2431 # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
2432 # get SIGPIPE. We must feed it data incrementally.
2434 while (read(DATA, $tar_input, 65536)) {
2435 print TARX $tar_input;
2438 die "'tar -xC $destdir' exited $?: $!";
2444 my $sdk_root = "$destdir/.arvados.sdk/sdk";
2446 foreach my $sdk_lang (("python",
2447 map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
2448 if (-d "$sdk_root/$sdk_lang") {
2449 if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
2450 die "Failed to install $sdk_lang SDK: $!";
2456 my $python_dir = "$install_dir/python";
2457 if ((-d $python_dir) and can_run("python2.7")) {
2458 open(my $egg_info_pipe, "-|",
2459 "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
2460 my @egg_info_errors = <$egg_info_pipe>;
2461 close($egg_info_pipe);
2464 if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
2465 # egg_info apparently failed because it couldn't ask git for a build tag.
2466 # Specify no build tag.
2467 open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
2468 print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
2471 my $egg_info_exit = $? >> 8;
2472 foreach my $errline (@egg_info_errors) {
2475 warn "python setup.py egg_info failed: exit $egg_info_exit";
2476 exit ($egg_info_exit || 1);
2481 # Hide messages from the install script (unless it fails: shell_or_die
2482 # will show $destdir.log in that case).
2483 open(STDOUT, ">>", "$destdir.log");
2484 open(STDERR, ">&", STDOUT);
2486 if (-e "$destdir/crunch_scripts/install") {
2487 shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
2488 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
2490 shell_or_die (undef, "./tests/autotests.sh", $install_dir);
2491 } elsif (-e "./install.sh") {
2492 shell_or_die (undef, "./install.sh", $install_dir);
2495 if ($archive_hash) {
2496 unlink "$destdir.archive_hash.new";
2497 symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
2498 rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
2504 my $command_name = shift;
2505 open(my $which, "-|", "which", $command_name);
2506 while (<$which>) { }
2513 my $exitcode = shift;
2515 if ($ENV{"DEBUG"}) {
2516 print STDERR "@_\n";
2518 if (system (@_) != 0) {
2521 my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
2522 open STDERR, ">&STDERR_ORIG";
2523 system ("cat $destdir.log >&2");
2524 warn "@_ failed ($err): $exitstatus";
2525 if (defined($exitcode)) {
2529 exit (($code >> 8) || 1);