-#!/usr/bin/perl
+#!/usr/bin/env perl
# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
=head1 NAME
use POSIX qw(strftime);
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Arvados;
+use Cwd qw(realpath);
+use Data::Dumper;
use Digest::MD5 qw(md5_hex);
use Getopt::Long;
use IPC::Open2;
use Fcntl ':flock';
use File::Path qw( make_path remove_tree );
+use constant TASK_TEMPFAIL => 111;
use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
$ENV{"TMPDIR"} ||= "/tmp";
unless (defined $ENV{"CRUNCH_TMP"}) {
}
}
+$ENV{"HOST_CRUNCHRUNNER_BIN"} ||= `which crunchrunner`;
+unless (defined($ENV{"HOST_CERTS"})) {
+ if (-f "/etc/ssl/certs/ca-certificates.crt") {
+ $ENV{"HOST_CERTS"} = "/etc/ssl/certs/ca-certificates.crt";
+ } elsif (-f "/etc/pki/tls/certs/ca-bundle.crt") {
+ $ENV{"HOST_CERTS"} = "/etc/pki/tls/certs/ca-bundle.crt";
+ }
+}
+
# Create the tmp directory if it does not exist
if ( ! -d $ENV{"CRUNCH_TMP"} ) {
make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
mkdir ($ENV{"JOB_WORK"});
+my %proc;
my $force_unlock;
my $git_dir;
my $jobspec;
my $job_api_token;
my $no_clear_tmp;
my $resume_stash;
+my $cgroup_root = "/sys/fs/cgroup";
+my $docker_bin = "docker.io";
+my $docker_run_args = "";
GetOptions('force-unlock' => \$force_unlock,
'git-dir=s' => \$git_dir,
'job=s' => \$jobspec,
'job-api-token=s' => \$job_api_token,
'no-clear-tmp' => \$no_clear_tmp,
'resume-stash=s' => \$resume_stash,
+ 'cgroup-root=s' => \$cgroup_root,
+ 'docker-bin=s' => \$docker_bin,
+ 'docker-run-args=s' => \$docker_run_args,
);
if (defined $job_api_token) {
$ENV{ARVADOS_API_TOKEN} = $job_api_token;
}
-my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $local_job = 0;
+my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
$SIG{'USR1'} = sub
$main::ENV{CRUNCH_DEBUG} = 0;
};
-
-
my $arv = Arvados->new('apiVersion' => 'v1');
my $Job;
my $sth;
my @jobstep;
-my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
-
+my $local_job;
if ($jobspec =~ /^[-a-z\d]+$/)
{
# $jobspec is an Arvados UUID, not a JSON job specification
- $Job = retry_op(sub {
- $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
- });
+ $Job = api_call("jobs/get", uuid => $jobspec);
+ $local_job = 0;
+}
+else
+{
+ $local_job = JSON::decode_json($jobspec);
+}
+
+
+# Make sure our workers (our slurm nodes, localhost, or whatever) are
+# at least able to run basic commands: they aren't down or severely
+# misconfigured.
+my $cmd = ['true'];
+if (($Job || $local_job)->{docker_image_locator}) {
+ $cmd = [$docker_bin, 'ps', '-q'];
+}
+Log(undef, "Sanity check is `@$cmd`");
+my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+ $cmd,
+ {label => "sanity check"});
+if ($exited != 0) {
+ Log(undef, "Sanity check failed: ".exit_status_s($exited));
+ exit EX_TEMPFAIL;
+}
+Log(undef, "Sanity check OK");
+
+
+my $User = api_call("users/current");
+
+if (!$local_job) {
if (!$force_unlock) {
# Claim this job, and make sure nobody else does
- eval { retry_op(sub {
- # lock() sets is_locked_by_uuid and changes state to Running.
- $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
- }); };
+ eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
if ($@) {
Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
exit EX_TEMPFAIL;
}
else
{
- $Job = JSON::decode_json($jobspec);
-
if (!$resume_stash)
{
- map { croak ("No $_ specified") unless $Job->{$_} }
+ map { croak ("No $_ specified") unless $local_job->{$_} }
qw(script script_version script_parameters);
}
- $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
- $Job->{'started_at'} = gmtime;
- $Job->{'state'} = 'Running';
+ $local_job->{'is_locked_by_uuid'} = $User->{'uuid'};
+ $local_job->{'started_at'} = gmtime;
+ $local_job->{'state'} = 'Running';
- $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
+ $Job = api_call("jobs/create", job => $local_job);
}
$job_id = $Job->{'uuid'};
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
+my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
+if ($? == 0) {
+ $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
+ chomp($gem_versions);
+ chop($gem_versions); # Closing parentheses
+} else {
+ $gem_versions = "";
+}
+Log(undef,
+ "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
Log (undef, "check slurm allocation");
my @slot;
{
Log (undef, "node $nodename - $ncpus slots");
my $node = { name => $nodename,
- ncpus => $ncpus,
- losing_streak => 0,
- hold_until => 0 };
+ ncpus => $ncpus,
+ # The number of consecutive times a task has been dispatched
+ # to this node and failed.
+ losing_streak => 0,
+ # The number of consecutive times that SLURM has reported
+ # a node failure since the last successful task.
+ fail_count => 0,
+ # Don't dispatch work to this node until this time
+ # (in seconds since the epoch) has passed.
+ hold_until => 0 };
foreach my $cpu (1..$ncpus)
{
push @slot, { node => $node,
my @jobstep_done = ();
my @jobstep_tomerge = ();
my $jobstep_tomerge_level = 0;
-my $squeue_checked;
-my $squeue_kill_checked;
-my $output_in_keep = 0;
+my $squeue_checked = 0;
my $latest_refresh = scalar time;
}
else
{
- my $first_task = retry_op(sub {
- $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
- 'job_uuid' => $Job->{'uuid'},
- 'sequence' => 0,
- 'qsequence' => 0,
- 'parameters' => {},
- });
+ my $first_task = api_call("job_tasks/create", job_task => {
+ 'job_uuid' => $Job->{'uuid'},
+ 'sequence' => 0,
+ 'qsequence' => 0,
+ 'parameters' => {},
});
push @jobstep, { 'level' => 0,
'failures' => 0,
must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
}
-
-my $build_script;
-do {
- local $/ = undef;
- $build_script = <DATA>;
-};
+my $build_script = handle_readall(\*DATA);
my $nodelist = join(",", @node);
+my $git_tar_count = 0;
if (!defined $no_clear_tmp) {
- # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
- Log (undef, "Clean work dirs");
+ # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
+ # up work directories crunch_tmp/work, crunch_tmp/opt,
+ # crunch_tmp/src*.
+ #
+ # TODO: When #5036 is done and widely deployed, we can limit mount's
+ # -t option to simply fuse.keep.
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+ ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+ {label => "clean work dirs"});
+ if ($exited != 0) {
+ exit(EX_RETRY_UNLOCKED);
+ }
+}
- my $cleanpid = fork();
- if ($cleanpid == 0)
+# If this job requires a Docker image, install that.
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
+if ($docker_locator = $Job->{docker_image_locator}) {
+ Log (undef, "Install docker image $docker_locator");
+ ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
+ if (!$docker_hash)
{
- srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
- exit (1);
+ croak("No Docker image hash found from locator $docker_locator");
}
- while (1)
+ Log (undef, "docker image hash is $docker_hash");
+ $docker_stream =~ s/^\.//;
+ my $docker_install_script = qq{
+if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
+ arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+fi
+};
+
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . join(',', @node)],
+ ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
+ {label => "load docker image"});
+ if ($exited != 0)
{
- last if $cleanpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($cleanpid);
- select (undef, undef, undef, 0.1);
+ exit(EX_RETRY_UNLOCKED);
+ }
+
+ # Determine whether this version of Docker supports memory+swap limits.
+ ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=1"],
+ [$docker_bin, 'run', '--help'],
+ {label => "check --memory-swap feature"});
+ $docker_limitmem = ($stdout =~ /--memory-swap/);
+
+ # Find a non-root Docker user to use.
+ # Tries the default user for the container, then 'crunch', then 'nobody',
+ # testing for whether the actual user id is non-zero. This defends against
+ # mistakes but not malice, but we intend to harden the security in the future
+ # so we don't want anyone getting used to their jobs running as root in their
+ # Docker containers.
+ my @tryusers = ("", "crunch", "nobody");
+ foreach my $try_user (@tryusers) {
+ my $label;
+ my $try_user_arg;
+ if ($try_user eq "") {
+ $label = "check whether default user is UID 0";
+ $try_user_arg = "";
+ } else {
+ $label = "check whether user '$try_user' is UID 0";
+ $try_user_arg = "--user=$try_user";
+ }
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=1"],
+ ["/bin/sh", "-ec",
+ "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
+ {label => $label});
+ chomp($stdout);
+ if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
+ $dockeruserarg = $try_user_arg;
+ if ($try_user eq "") {
+ Log(undef, "Container will run with default user");
+ } else {
+ Log(undef, "Container will run with $dockeruserarg");
+ }
+ last;
+ }
}
- Log (undef, "Cleanup command exited ".exit_status_s($?));
-}
+ if (!defined $dockeruserarg) {
+ croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
+ }
+
+ if ($Job->{arvados_sdk_version}) {
+ # The job also specifies an Arvados SDK version. Add the SDKs to the
+ # tar file for the build script to install.
+ Log(undef, sprintf("Packing Arvados SDK version %s for installation",
+ $Job->{arvados_sdk_version}));
+ add_git_archive("git", "--git-dir=$git_dir", "archive",
+ "--prefix=.arvados.sdk/",
+ $Job->{arvados_sdk_version}, "sdk");
+ }
+}
-my $git_archive;
if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
# If script_version looks like an absolute path, *and* the --git-dir
# argument was not given -- which implies we were not invoked by
} else {
# $repo is none of the above. It must be the name of a hosted
# repository.
- my $arv_repo_list = retry_op(sub {
- $arv->{'repositories'}->{'list'}->execute(
- 'filters' => [['name','=',$repo]]);
- });
+ my $arv_repo_list = api_call("repositories/list",
+ 'filters' => [['name','=',$repo]]);
my @repos_found = @{$arv_repo_list->{'items'}};
my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
if ($n_found > 0) {
unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) {
croak("`$gitcmd rev-list` exited "
.exit_status_s($?)
- .", '$treeish' not found. Giving up.");
+ .", '$treeish' not found, giving up");
}
$commit = $1;
Log(undef, "Version $treeish is commit $commit");
}
$ENV{"CRUNCH_SRC_COMMIT"} = $commit;
- $git_archive = `$gitcmd archive ''\Q$commit\E`;
- if ($?) {
- croak("Error: $gitcmd archive exited ".exit_status_s($?));
- }
+ add_git_archive("$gitcmd archive ''\Q$commit\E");
}
+my $git_archive = combined_git_archive();
if (!defined $git_archive) {
Log(undef, "Skip install phase (no git archive)");
if ($have_slurm) {
}
}
else {
- Log(undef, "Run install script on all workers");
-
- my @srunargs = ("srun",
- "--nodelist=$nodelist",
- "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
- my @execargs = ("sh", "-c",
- "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-
- # Note: this section is almost certainly unnecessary if we're
- # running tasks in docker containers.
- my $installpid = fork();
- if ($installpid == 0)
- {
- srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
- exit (1);
- }
- while (1)
- {
- last if $installpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($installpid);
- select (undef, undef, undef, 0.1);
- }
- Log (undef, "Install script exited ".exit_status_s($?));
-}
+ my $exited;
+ my $install_script_tries_left = 3;
+ for (my $attempts = 0; $attempts < 3; $attempts++) {
+ my @srunargs = ("srun",
+ "--nodelist=$nodelist",
+ "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+ my @execargs = ("sh", "-c",
+ "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+
+ $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
+ my ($stdout, $stderr);
+ ($exited, $stdout, $stderr) = srun_sync(
+ \@srunargs, \@execargs,
+ {label => "run install script on all workers"},
+ $build_script . $git_archive);
+
+ my $stderr_anything_from_script = 0;
+ for my $line (split(/\n/, $stderr)) {
+ if ($line !~ /^(srun: error: |starting: \[)/) {
+ $stderr_anything_from_script = 1;
+ }
+ }
-if (!$have_slurm)
-{
- # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
- must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
-}
+ last if $exited == 0 || $main::please_freeze;
-# If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
-if ($docker_locator = $Job->{docker_image_locator}) {
- ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
- if (!$docker_hash)
- {
- croak("No Docker image hash found from locator $docker_locator");
+ # If the install script fails but doesn't print an error message,
+ # the next thing anyone is likely to do is just run it again in
+ # case it was a transient problem like "slurm communication fails
+ # because the network isn't reliable enough". So we'll just do
+ # that ourselves (up to 3 attempts in total). OTOH, if there is an
+ # error message, the problem is more likely to have a real fix and
+ # we should fail the job so the fixing process can start, instead
+ # of doing 2 more attempts.
+ last if $stderr_anything_from_script;
}
- $docker_stream =~ s/^\.//;
- my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
- arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
-fi
-};
- my $docker_pid = fork();
- if ($docker_pid == 0)
- {
- srun (["srun", "--nodelist=" . join(',', @node)],
- ["/bin/sh", "-ec", $docker_install_script]);
- exit ($?);
- }
- while (1)
- {
- last if $docker_pid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($docker_pid);
- select (undef, undef, undef, 0.1);
+
+ foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
+ unlink($tar_filename);
}
- if ($? != 0)
- {
- croak("Installing Docker image from $docker_locator exited "
- .exit_status_s($?));
+
+ if ($exited != 0) {
+ croak("Giving up");
}
}
my $thisround_succeeded = 0;
my $thisround_failed = 0;
my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
or $a <=> $b } @jobstep_todo;
my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
+my $initial_tasks_this_level = 0;
+foreach my $id (@jobstep_todo) {
+ $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
+}
+# If the number of tasks scheduled at this level #T is smaller than the number
+# of slots available #S, only use the first #T slots, or the first slot on
+# each node, whichever number is greater.
+#
+# When we dispatch tasks later, we'll allocate whole-node resources like RAM
+# based on these numbers. Using fewer slots makes more resources available
+# to each individual task, which should normally be a better strategy when
+# there are fewer of them running with less parallelism.
+#
+# Note that this calculation is not redone if the initial tasks at
+# this level queue more tasks at the same level. This may harm
+# overall task throughput for that level.
+my @freeslot;
+if ($initial_tasks_this_level < @node) {
+ @freeslot = (0..$#node);
+} elsif ($initial_tasks_this_level < @slot) {
+ @freeslot = (0..$initial_tasks_this_level - 1);
+} else {
+ @freeslot = (0..$#slot);
+}
+my $round_num_freeslots = scalar(@freeslot);
+print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
-my %proc;
-my @freeslot = (0..$#slot);
+my %round_max_slots = ();
+for (my $ii = $#freeslot; $ii >= 0; $ii--) {
+ my $this_slot = $slot[$freeslot[$ii]];
+ my $node_name = $this_slot->{node}->{name};
+ $round_max_slots{$node_name} ||= $this_slot->{cpu};
+ last if (scalar(keys(%round_max_slots)) >= @node);
+}
+
+Log(undef, "start level $level with $round_num_freeslots slots");
my @holdslot;
my %reader;
my $progress_is_dirty = 1;
update_progress_stats();
-
THISROUND:
for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
+ # Don't create new tasks if we already know the job's final result.
+ last if defined($main::success);
+
my $id = $jobstep_todo[$todo_ptr];
my $Jobstep = $jobstep[$id];
if ($Jobstep->{level} != $level)
next;
}
- pipe $reader{$id}, "writer" or croak ($!);
- my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
- fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+ pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
+ set_nonblocking($reader{$id});
my $childslot = $freeslot[0];
my $childnode = $slot[$childslot]->{node};
my $childslotname = join (".",
$slot[$childslot]->{node}->{name},
$slot[$childslot]->{cpu});
+
my $childpid = fork();
if ($childpid == 0)
{
}
$ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
$ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
- $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
+ $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
$ENV{"HOME"} = $ENV{"TASK_WORK"};
- $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
- $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+ $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
$ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
+ my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
+
$ENV{"GZIP"} = "-n";
my @srunargs = (
qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
"--job-name=$job_id.$id.$$",
);
- my $build_script_to_send = "";
- my $command =
- "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
- ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
- ."&& cd $ENV{CRUNCH_TMP} ";
- if ($build_script)
- {
- $build_script_to_send = $build_script;
- $command .=
- "&& perl -";
+
+ my $stdbuf = " stdbuf --output=0 --error=0 ";
+
+ my $arv_file_cache = "";
+ if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
+ $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
}
- $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+
+ my $command =
+ "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
+ ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
+ ."&& cd \Q$ENV{CRUNCH_TMP}\E "
+ # These environment variables get used explicitly later in
+ # $command. No tool is expected to read these values directly.
+ .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
+ .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
+ ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
+ ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
+
+ $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
+ $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
+ $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
+
if ($docker_hash)
{
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
- $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
- # Dynamically configure the container to use the host system as its
- # DNS server. Get the host's global addresses from the ip command,
- # and turn them into docker --dns options using gawk.
- $command .=
- q{$(ip -o address show scope global |
- gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
- $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
+ my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
+ my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
+ $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+ $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
+ # We only set memory limits if Docker lets us limit both memory and swap.
+ # Memory limits alone have been supported longer, but subprocesses tend
+ # to get SIGKILL if they exceed that without any swap limit set.
+ # See #5642 for additional background.
+ if ($docker_limitmem) {
+ $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
+ }
+
+ # The source tree and $destdir directory (which we have
+ # installed on the worker host) are available in the container,
+ # under the same path.
+ $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
+ $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
+
+ # Currently, we make the "by_pdh" directory in arv-mount's mount
+ # point appear at /keep inside the container (instead of using
+ # the same path as the host like we do with CRUNCH_SRC and
+ # CRUNCH_INSTALL). However, crunch scripts and utilities must
+ # not rely on this. They must use $TASK_KEEPMOUNT.
$command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
- $command .= "--env=\QHOME=/home/crunch\E ";
+ $ENV{TASK_KEEPMOUNT} = "/keep";
+
+ # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
+ $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
+ $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
+
+ # TASK_WORK is almost exactly like a docker data volume: it
+ # starts out empty, is writable, and persists until no
+ # containers use it any more. We don't use --volumes-from to
+ # share it with other containers: it is only accessible to this
+ # task, and it goes away when this task stops.
+ #
+ # However, a docker data volume is writable only by root unless
+ # the mount point already happens to exist in the container with
+ # different permissions. Therefore, we [1] assume /tmp already
+ # exists in the image and is writable by the crunch user; [2]
+ # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
+ # writable if they are created by docker while setting up the
+ # other --volumes); and [3] create $TASK_WORK inside the
+ # container using $build_script.
+ $command .= "--volume=/tmp ";
+ $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
+ $ENV{"HOME"} = $ENV{"TASK_WORK"};
+ $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
+
+ # TODO: Share a single JOB_WORK volume across all task
+ # containers on a given worker node, and delete it when the job
+ # ends (and, in case that doesn't work, when the next job
+ # starts).
+ #
+ # For now, use the same approach as TASK_WORK above.
+ $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
+
+ # Bind mount the crunchrunner binary and host TLS certificates file into
+ # the container.
+ $command .= "--volume=\Q$ENV{HOST_CRUNCHRUNNER_BIN}:/usr/local/bin/crunchrunner\E ";
+ $command .= "--volume=\Q$ENV{HOST_CERTS}:/etc/arvados/ca-certificates.crt\E ";
+
while (my ($env_key, $env_val) = each %ENV)
{
- if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
- if ($env_key eq "TASK_WORK") {
- $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
- }
- elsif ($env_key eq "TASK_KEEPMOUNT") {
- $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
- }
- else {
- $command .= "--env=\Q$env_key=$env_val\E ";
- }
+ if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
+ $command .= "--env=\Q$env_key=$env_val\E ";
}
}
- $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
- $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
+ $command .= "--env=\QHOME=$ENV{HOME}\E ";
$command .= "\Q$docker_hash\E ";
- $command .= "stdbuf --output=0 --error=0 ";
- $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
+
+ if ($Job->{arvados_sdk_version}) {
+ $command .= $stdbuf;
+ $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
+ } else {
+ $command .= "/bin/sh -c \'python -c " .
+ '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
+ ">&2 2>/dev/null; " .
+ "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+ "if which stdbuf >/dev/null ; then " .
+ " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " else " .
+ " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " fi\'";
+ }
} else {
# Non-docker run
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
- $command .= "stdbuf --output=0 --error=0 ";
- $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+ $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
+ $command .= $stdbuf;
+ $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
}
my @execargs = ('bash', '-c', $command);
- srun (\@srunargs, \@execargs, undef, $build_script_to_send);
+ srun (\@srunargs, \@execargs, undef, $build_script);
# exec() failed, we assume nothing happened.
- Log(undef, "srun() failed on build script");
- die;
+ die "srun() failed on build script\n";
}
close("writer");
if (!defined $childpid)
next;
}
shift @freeslot;
- $proc{$childpid} = { jobstep => $id,
- time => time,
- slot => $childslot,
- jobstepname => "$job_id.$id.$childpid",
- };
+ $proc{$childpid} = {
+ jobstepidx => $id,
+ time => time,
+ slot => $childslot,
+ jobstepname => "$job_id.$id.$childpid",
+ };
croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
$slot[$childslot]->{pid} = $childpid;
$Jobstep->{slotindex} = $childslot;
delete $Jobstep->{stderr};
delete $Jobstep->{finishtime};
+ delete $Jobstep->{tempfail};
$Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
$Jobstep->{'arvados_task'}->save;
while (!@freeslot
||
- (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+ ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
last THISROUND if $main::please_freeze;
if ($main::please_info)
{
$main::please_info = 0;
freeze();
- collate_output();
+ create_output_collection();
save_meta(1);
update_progress_stats();
}
my $gotsome
= readfrompipes ()
+ reapchildren ();
- if (!$gotsome)
+ if (!$gotsome || ($latest_refresh + 2 < scalar time))
{
check_refresh_wanted();
check_squeue();
update_progress_stats();
- select (undef, undef, undef, 0.1);
}
- elsif (time - $progress_stats_updated >= 30)
+ elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
{
update_progress_stats();
}
+ if (!$gotsome) {
+ select (undef, undef, undef, 0.1);
+ }
+ $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
+ $_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
{
}
# give up if no nodes are succeeding
- if (!grep { $_->{node}->{losing_streak} == 0 &&
- $_->{node}->{hold_count} < 4 } @slot) {
- my $message = "Every node has failed -- giving up on this round";
- Log (undef, $message);
+ if ($working_slot_count < 1) {
+ Log(undef, "Every node has failed -- giving up");
last THISROUND;
}
}
$main::please_continue = 0;
goto THISROUND;
}
- $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
+ $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
readfrompipes ();
if (!reapchildren())
{
if (!defined $main::success)
{
- if (@jobstep_todo &&
- $thisround_succeeded == 0 &&
- ($thisround_failed == 0 || $thisround_failed > 4))
- {
+ if (!@jobstep_todo) {
+ $main::success = 1;
+ } elsif ($working_slot_count < 1) {
+ save_output_collection();
+ save_meta();
+ exit(EX_RETRY_UNLOCKED);
+ } elsif ($thisround_succeeded == 0 &&
+ ($thisround_failed == 0 || $thisround_failed > 4)) {
my $message = "stop because $thisround_failed tasks failed and none succeeded";
Log (undef, $message);
$main::success = 0;
}
- if (!@jobstep_todo)
- {
- $main::success = 1;
- }
}
goto ONELEVEL if !defined $main::success;
release_allocation();
freeze();
-my $collated_output = &collate_output();
-
-if (!$collated_output) {
- Log(undef, "output undef");
-}
-else {
- eval {
- open(my $orig_manifest, '-|', 'arv-get', $collated_output)
- or die "failed to get collated manifest: $!";
- my $orig_manifest_text = '';
- while (my $manifest_line = <$orig_manifest>) {
- $orig_manifest_text .= $manifest_line;
- }
- my $output = retry_op(sub {
- $arv->{'collections'}->{'create'}->execute(
- 'collection' => {'manifest_text' => $orig_manifest_text});
- });
- Log(undef, "output uuid " . $output->{uuid});
- Log(undef, "output hash " . $output->{portable_data_hash});
- $Job->update_attributes('output' => $output->{portable_data_hash});
- };
- if ($@) {
- Log (undef, "Failed to register output manifest: $@");
- }
-}
-
+my $collated_output = save_output_collection();
Log (undef, "finish");
save_meta();
$progress_stats_updated = time;
return if !$progress_is_dirty;
my ($todo, $done, $running) = (scalar @jobstep_todo,
- scalar @jobstep_done,
- scalar @slot - scalar @freeslot - scalar @holdslot);
+ scalar @jobstep_done,
+ scalar keys(%proc));
$Job->{'tasks_summary'} ||= {};
$Job->{'tasks_summary'}->{'todo'} = $todo;
$Job->{'tasks_summary'}->{'done'} = $done;
sub reapchildren
{
- my $pid = waitpid (-1, WNOHANG);
- return 0 if $pid <= 0;
-
- my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
- . "."
- . $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepid = $proc{$pid}->{jobstep};
- my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepid];
-
- my $childstatus = $?;
- my $exitvalue = $childstatus >> 8;
- my $exitinfo = "exit ".exit_status_s($childstatus);
- $Jobstep->{'arvados_task'}->reload;
- my $task_success = $Jobstep->{'arvados_task'}->{success};
-
- Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
-
- if (!defined $task_success) {
- # task did not indicate one way or the other --> fail
- $Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
- $task_success = 0;
- }
+ my $children_reaped = 0;
+ my @successful_task_uuids = ();
- if (!$task_success)
+ while((my $pid = waitpid (-1, WNOHANG)) > 0)
{
- my $temporary_fail;
- $temporary_fail ||= $Jobstep->{node_fail};
- $temporary_fail ||= ($exitvalue == 111);
-
- ++$thisround_failed;
- ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-
- # Check for signs of a failed or misconfigured node
- if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
- 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
- # Don't count this against jobstep failure thresholds if this
- # node is already suspected faulty and srun exited quickly
- if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
- $elapsed < 5) {
- Log ($jobstepid, "blaming failure on suspect node " .
- $slot[$proc{$pid}->{slot}]->{node}->{name});
- $temporary_fail ||= 1;
- }
- ban_node_by_slot($proc{$pid}->{slot});
+ my $childstatus = $?;
+
+ my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+ . "."
+ . $slot[$proc{$pid}->{slot}]->{cpu});
+ my $jobstepidx = $proc{$pid}->{jobstepidx};
+
+ if (!WIFEXITED($childstatus))
+ {
+ # child did not exit (may be temporarily stopped)
+ Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
+ next;
}
- Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
- ++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary ' : 'permanent',
- $elapsed));
+ $children_reaped++;
+ my $elapsed = time - $proc{$pid}->{time};
+ my $Jobstep = $jobstep[$jobstepidx];
+
+ my $exitvalue = $childstatus >> 8;
+ my $exitinfo = "exit ".exit_status_s($childstatus);
+ $Jobstep->{'arvados_task'}->reload;
+ my $task_success = $Jobstep->{'arvados_task'}->{success};
+
+ Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
+
+ if (!defined $task_success) {
+ # task did not indicate one way or the other --> fail
+ Log($jobstepidx, sprintf(
+ "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+ exit_status_s($childstatus)));
+ $Jobstep->{'arvados_task'}->{success} = 0;
+ $Jobstep->{'arvados_task'}->save;
+ $task_success = 0;
+ }
- if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
- # Give up on this task, and the whole job
- $main::success = 0;
- $main::please_freeze = 1;
+ if (!$task_success)
+ {
+ my $temporary_fail;
+ $temporary_fail ||= $Jobstep->{tempfail};
+ $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+
+ ++$thisround_failed;
+ ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+
+ # Check for signs of a failed or misconfigured node
+ if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+ 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+ # Don't count this against jobstep failure thresholds if this
+ # node is already suspected faulty and srun exited quickly
+ if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+ $elapsed < 5) {
+ Log ($jobstepidx, "blaming failure on suspect node " .
+ $slot[$proc{$pid}->{slot}]->{node}->{name});
+ $temporary_fail ||= 1;
+ }
+ ban_node_by_slot($proc{$pid}->{slot});
+ }
+
+ Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
+ ++$Jobstep->{'failures'},
+ $temporary_fail ? 'temporary' : 'permanent',
+ $elapsed));
+
+ if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+ # Give up on this task, and the whole job
+ $main::success = 0;
+ }
+ # Put this task back on the todo queue
+ push @jobstep_todo, $jobstepidx;
+ $Job->{'tasks_summary'}->{'failed'}++;
}
- # Put this task back on the todo queue
- push @jobstep_todo, $jobstepid;
- $Job->{'tasks_summary'}->{'failed'}++;
- }
- else
- {
- ++$thisround_succeeded;
- $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
- push @jobstep_done, $jobstepid;
- Log ($jobstepid, "success in $elapsed seconds");
- }
- $Jobstep->{exitcode} = $childstatus;
- $Jobstep->{finishtime} = time;
- $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
- process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
+ else # task_success
+ {
+ push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
+ ++$thisround_succeeded;
+ $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
+ push @jobstep_done, $jobstepidx;
+ Log ($jobstepidx, "success in $elapsed seconds");
+ }
+ $Jobstep->{exitcode} = $childstatus;
+ $Jobstep->{finishtime} = time;
+ $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+ $Jobstep->{'arvados_task'}->save;
+ process_stderr_final ($jobstepidx);
+ Log ($jobstepidx, sprintf("task output (%d bytes): %s",
+ length($Jobstep->{'arvados_task'}->{output}),
+ $Jobstep->{'arvados_task'}->{output}));
+
+ close $reader{$jobstepidx};
+ delete $reader{$jobstepidx};
+ delete $slot[$proc{$pid}->{slot}]->{pid};
+ push @freeslot, $proc{$pid}->{slot};
+ delete $proc{$pid};
- close $reader{$jobstepid};
- delete $reader{$jobstepid};
- delete $slot[$proc{$pid}->{slot}]->{pid};
- push @freeslot, $proc{$pid}->{slot};
- delete $proc{$pid};
+ $progress_is_dirty = 1;
+ }
- if ($task_success) {
+ if (scalar(@successful_task_uuids) > 0)
+ {
+ Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
# Load new tasks
my $newtask_list = [];
my $newtask_results;
do {
- $newtask_results = retry_op(sub {
- $arv->{'job_tasks'}->{'list'}->execute(
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
- });
+ $newtask_results = api_call(
+ "job_tasks/list",
+ 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
push(@$newtask_list, @{$newtask_results->{items}});
} while (@{$newtask_results->{items}});
+ Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
foreach my $arvados_task (@$newtask_list) {
my $jobstep = {
'level' => $arvados_task->{'sequence'},
}
}
- $progress_is_dirty = 1;
- 1;
+ return $children_reaped;
}
sub check_refresh_wanted
{
my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
- if (@stat && $stat[9] > $latest_refresh) {
+ if (@stat &&
+ $stat[9] > $latest_refresh &&
+ # ...and we have actually locked the job record...
+ $job_id eq $Job->{'uuid'}) {
$latest_refresh = scalar time;
- my $Job2 = retry_op(sub {
- $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
- });
+ my $Job2 = api_call("jobs/get", uuid => $jobspec);
for my $attr ('cancelled_at',
'cancelled_by_user_uuid',
'cancelled_by_client_uuid',
sub check_squeue
{
- # return if the kill list was checked <4 seconds ago
- if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
+ my $last_squeue_check = $squeue_checked;
+
+ # Do not call `squeue` or check the kill list more than once every
+ # 15 seconds.
+ return if $last_squeue_check > time - 15;
+ $squeue_checked = time;
+
+ # Look for children from which we haven't received stderr data since
+ # the last squeue check. If no such children exist, all procs are
+ # alive and there's no need to even look at squeue.
+ #
+ # As long as the crunchstat poll interval (10s) is shorter than the
+ # squeue check interval (15s) this should make the squeue check an
+ # infrequent event.
+ my $silent_procs = 0;
+ for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
{
- return;
+ if (!exists($js->{stderr_at}))
+ {
+ $js->{stderr_at} = 0;
+ }
+ if ($js->{stderr_at} < $last_squeue_check)
+ {
+ $silent_procs++;
+ }
}
- $squeue_kill_checked = time;
+ return if $silent_procs == 0;
# use killem() on procs whose killtime is reached
- for (keys %proc)
+ while (my ($pid, $procinfo) = each %proc)
{
- if (exists $proc{$_}->{killtime}
- && $proc{$_}->{killtime} <= time)
+ my $js = $jobstep[$procinfo->{jobstepidx}];
+ if (exists $procinfo->{killtime}
+ && $procinfo->{killtime} <= time
+ && $js->{stderr_at} < $last_squeue_check)
{
- killem ($_);
+ my $sincewhen = "";
+ if ($js->{stderr_at}) {
+ $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
+ }
+ Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ killem ($pid);
}
}
- # return if the squeue was checked <60 seconds ago
- if (defined $squeue_checked && $squeue_checked > time - 60)
- {
- return;
- }
- $squeue_checked = time;
-
if (!$have_slurm)
{
# here is an opportunity to check for mysterious problems with local procs
return;
}
- # get a list of steps still running
- my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
- chop @squeue;
- if ($squeue[-1] ne "ok")
+ # Get a list of steps still running. Note: squeue(1) says --steps
+ # selects a format (which we override anyway) and allows us to
+ # specify which steps we're interested in (which we don't).
+ # Importantly, it also changes the meaning of %j from "job name" to
+ # "step name" and (although this isn't mentioned explicitly in the
+ # docs) switches from "one line per job" mode to "one line per step"
+ # mode. Without it, we'd just get a list of one job, instead of a
+ # list of N steps.
+ my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`;
+ if ($? != 0)
{
+ Log(undef, "warning: squeue exit status $? ($!)");
return;
}
- pop @squeue;
+ chop @squeue;
# which of my jobsteps are running, according to squeue?
my %ok;
- foreach (@squeue)
+ for my $jobstepname (@squeue)
{
- if (/^(\d+)\.(\d+) (\S+)/)
- {
- if ($1 eq $ENV{SLURM_JOBID})
- {
- $ok{$3} = 1;
- }
- }
+ $ok{$jobstepname} = 1;
}
- # which of my active child procs (>60s old) were not mentioned by squeue?
- foreach (keys %proc)
+ # Check for child procs >60s old and not mentioned by squeue.
+ while (my ($pid, $procinfo) = each %proc)
{
- if ($proc{$_}->{time} < time - 60
- && !exists $ok{$proc{$_}->{jobstepname}}
- && !exists $proc{$_}->{killtime})
+ if ($procinfo->{time} < time - 60
+ && $procinfo->{jobstepname}
+ && !exists $ok{$procinfo->{jobstepname}}
+ && !exists $procinfo->{killtime})
{
- # kill this proc if it hasn't exited in 30 seconds
- $proc{$_}->{killtime} = time + 30;
+ # According to slurm, this task has ended (successfully or not)
+ # -- but our srun child hasn't exited. First we must wait (30
+ # seconds) in case this is just a race between communication
+ # channels. Then, if our srun child process still hasn't
+ # terminated, we'll conclude some slurm communication
+ # error/delay has caused the task to die without notifying srun,
+ # and we'll kill srun ourselves.
+ $procinfo->{killtime} = time + 30;
+ Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
if ($have_slurm)
{
Log (undef, "release job allocation");
- system "scancel $ENV{SLURM_JOBID}";
+ system "scancel $ENV{SLURM_JOB_ID}";
}
}
sub readfrompipes
{
my $gotsome = 0;
- foreach my $job (keys %reader)
+ my %fd_job;
+ my $sel = IO::Select->new();
+ foreach my $jobstepidx (keys %reader)
+ {
+ my $fd = $reader{$jobstepidx};
+ $sel->add($fd);
+ $fd_job{$fd} = $jobstepidx;
+
+ if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
+ $sel->add($stdout_fd);
+ $fd_job{$stdout_fd} = $jobstepidx;
+ }
+ }
+ # select on all reader fds with 0.1s timeout
+ my @ready_fds = $sel->can_read(0.1);
+ foreach my $fd (@ready_fds)
{
my $buf;
- while (0 < sysread ($reader{$job}, $buf, 8192))
+ if (0 < sysread ($fd, $buf, 65536))
{
+ $gotsome = 1;
print STDERR $buf if $ENV{CRUNCH_DEBUG};
- $jobstep[$job]->{stderr} .= $buf;
- preprocess_stderr ($job);
- if (length ($jobstep[$job]->{stderr}) > 16384)
+
+ my $jobstepidx = $fd_job{$fd};
+ if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
+ $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+ next;
+ }
+
+ $jobstep[$jobstepidx]->{stderr_at} = time;
+ $jobstep[$jobstepidx]->{stderr} .= $buf;
+
+ # Consume everything up to the last \n
+ preprocess_stderr ($jobstepidx);
+
+ if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
{
- substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+ # If we get a lot of stderr without a newline, chop off the
+ # front to avoid letting our buffer grow indefinitely.
+ substr ($jobstep[$jobstepidx]->{stderr},
+ 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
}
- $gotsome = 1;
}
}
return $gotsome;
}
+# Consume all full lines of stderr for a jobstep. Everything after the
+# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
+# returning.
sub preprocess_stderr
{
- my $job = shift;
+ my $jobstepidx = shift;
- while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+ while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
my $line = $1;
- substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
- Log ($job, "stderr $line");
+ substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
+ Log ($jobstepidx, "stderr $line");
if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
- $jobstep[$job]->{node_fail} = 1;
- ban_node_by_slot($jobstep[$job]->{slotindex});
+ elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
+ # Skip the following tempfail checks if this srun proc isn't
+ # attached to a particular worker slot.
+ }
+ elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
+ my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
+ $slot[$job_slot_index]->{node}->{fail_count}++;
+ $jobstep[$jobstepidx]->{tempfail} = 1;
+ ban_node_by_slot($job_slot_index);
+ }
+ elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+ $jobstep[$jobstepidx]->{tempfail} = 1;
+ ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
+ }
+ elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
+ $jobstep[$jobstepidx]->{tempfail} = 1;
}
}
}
-sub process_stderr
+sub process_stderr_final
{
- my $job = shift;
- my $task_success = shift;
- preprocess_stderr ($job);
+ my $jobstepidx = shift;
+ preprocess_stderr ($jobstepidx);
map {
- Log ($job, "stderr $_");
- } split ("\n", $jobstep[$job]->{stderr});
+ Log ($jobstepidx, "stderr $_");
+ } split ("\n", $jobstep[$jobstepidx]->{stderr});
+ $jobstep[$jobstepidx]->{stderr} = '';
}
sub fetch_block
{
my $hash = shift;
- my ($keep, $child_out, $output_block);
-
- my $cmd = "arv-get \Q$hash\E";
- open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
- $output_block = '';
+ my $keep;
+ if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
+ Log(undef, "fetch_block run error from arv-get $hash: $!");
+ return undef;
+ }
+ my $output_block = "";
while (1) {
my $buf;
my $bytes = sysread($keep, $buf, 1024 * 1024);
if (!defined $bytes) {
- die "reading from arv-get: $!";
+ Log(undef, "fetch_block read error from arv-get: $!");
+ $output_block = undef;
+ last;
} elsif ($bytes == 0) {
# sysread returns 0 at the end of the pipe.
last;
}
}
close $keep;
+ if ($?) {
+ Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
+ $output_block = undef;
+ }
return $output_block;
}
-sub collate_output
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
+sub create_output_collection
{
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
- '--retries', retry_count());
- my $joboutput;
+ my $pid = open2($child_out, $child_in, 'python', '-c', q{
+import arvados
+import sys
+print (arvados.api("v1").collections().
+ create(body={"manifest_text": sys.stdin.read()}).
+ execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
+}, retry_count());
+
+ my $task_idx = -1;
+ my $manifest_size = 0;
for (@jobstep)
{
- next if (!exists $_->{'arvados_task'}->{'output'} ||
- !$_->{'arvados_task'}->{'success'});
+ ++$task_idx;
my $output = $_->{'arvados_task'}->{output};
- if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
- {
- $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
- print $child_in $output;
- }
- elsif (@jobstep == 1)
- {
- $joboutput = $output;
- last;
- }
- elsif (defined (my $outblock = fetch_block ($output)))
- {
- $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
- print $child_in $outblock;
+ next if (!defined($output));
+ my $next_write;
+ if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
+ $next_write = fetch_block($output);
+ } else {
+ $next_write = $output;
}
- else
- {
- Log (undef, "XXX fetch_block($output) failed XXX");
+ if (defined($next_write)) {
+ if (!defined(syswrite($child_in, $next_write))) {
+ # There's been an error writing. Stop the loop.
+ # We'll log details about the exit code later.
+ last;
+ } else {
+ $manifest_size += length($next_write);
+ }
+ } else {
+ my $uuid = $_->{'arvados_task'}->{'uuid'};
+ Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
$main::success = 0;
}
}
- $child_in->close;
+ close($child_in);
+ Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
- if (!defined $joboutput) {
- my $s = IO::Select->new($child_out);
- if ($s->can_read(120)) {
- sysread($child_out, $joboutput, 64 * 1024 * 1024);
- chomp($joboutput);
- # TODO: Ensure exit status == 0.
+ my $joboutput;
+ my $s = IO::Select->new($child_out);
+ if ($s->can_read(120)) {
+ sysread($child_out, $joboutput, 1024 * 1024);
+ waitpid($pid, 0);
+ if ($?) {
+ Log(undef, "output collection creation exited " . exit_status_s($?));
+ $joboutput = undef;
} else {
- Log (undef, "timed out reading from 'arv-put'");
+ chomp($joboutput);
+ }
+ } else {
+ Log (undef, "timed out while creating output collection");
+ foreach my $signal (2, 2, 2, 15, 15, 9) {
+ kill($signal, $pid);
+ last if waitpid($pid, WNOHANG) == -1;
+ sleep(1);
}
}
- # TODO: kill $pid instead of waiting, now that we've decided to
- # ignore further output.
- waitpid($pid, 0);
+ close($child_out);
return $joboutput;
}
+# Calls create_output_collection, logs the result, and returns it.
+# If that was successful, save that as the output in the job record.
+sub save_output_collection {
+ my $collated_output = create_output_collection();
+
+ if (!$collated_output) {
+ Log(undef, "Failed to write output collection");
+ }
+ else {
+ Log(undef, "job output $collated_output");
+ $Job->update_attributes('output' => $collated_output);
+ }
+ return $collated_output;
+}
sub killem
{
}
if (!exists $proc{$_}->{"sent_$sig"})
{
- Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+ Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
kill $sig, $_;
select (undef, undef, undef, 0.1);
if ($sig == 2)
# Send log output to Keep via arv-put.
#
# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
# $log_pipe_pid is the pid of the arv-put subprocess.
#
# The only functions that should access these variables directly are:
# Starts an arv-put pipe, reading data on stdin and writing it to
# a $logfilename file in an output collection.
#
+# log_writer_read_output([$timeout])
+# Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+# Passes $timeout to the select() call, with a default of 0.01.
+# Returns the result of the last read() call on $log_pipe_out, or
+# -1 if read() wasn't called because select() timed out.
+# Only other log_writer_* functions should need to call this.
+#
# log_writer_send($txt)
# Writes $txt to the output log collection.
#
# Returns a true value if there is currently a live arv-put
# process, false otherwise.
#
-my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+ $log_pipe_pid);
sub log_writer_start($)
{
my $logfilename = shift;
$log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
- 'arv-put', '--portable-data-hash',
+ 'arv-put',
+ '--stream',
'--retries', '3',
'--filename', $logfilename,
'-');
+ $log_pipe_out_buf = "";
+ $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+ my $timeout = shift || 0.01;
+ my $read = -1;
+ while ($read && $log_pipe_out_select->can_read($timeout)) {
+ $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+ length($log_pipe_out_buf));
+ }
+ if (!defined($read)) {
+ Log(undef, "error reading log manifest from arv-put: $!");
+ }
+ return $read;
}
sub log_writer_send($)
{
my $txt = shift;
print $log_pipe_in $txt;
+ log_writer_read_output();
}
sub log_writer_finish()
return unless $log_pipe_pid;
close($log_pipe_in);
- my $arv_put_output;
- my $s = IO::Select->new($log_pipe_out);
- if ($s->can_read(120)) {
- sysread($log_pipe_out, $arv_put_output, 1024);
- chomp($arv_put_output);
- } else {
+ my $logger_failed = 0;
+ my $read_result = log_writer_read_output(120);
+ if ($read_result == -1) {
+ $logger_failed = -1;
Log (undef, "timed out reading from 'arv-put'");
+ } elsif ($read_result != 0) {
+ $logger_failed = -2;
+ Log(undef, "failed to read arv-put log manifest to EOF");
}
waitpid($log_pipe_pid, 0);
- $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
if ($?) {
- Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+ $logger_failed ||= $?;
+ Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
}
+ close($log_pipe_out);
+ my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
+ $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+ $log_pipe_out_select = undef;
+
return $arv_put_output;
}
return $log_pipe_pid;
}
-sub Log # ($jobstep_id, $logmessage)
+sub Log # ($jobstepidx, $logmessage)
{
- if ($_[1] =~ /\n/) {
+ my ($jobstepidx, $logmessage) = @_;
+ if ($logmessage =~ /\n/) {
for my $line (split (/\n/, $_[1])) {
- Log ($_[0], $line);
+ Log ($jobstepidx, $line);
}
return;
}
my $fh = select STDERR; $|=1; select $fh;
- my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+ my $task_qseq = '';
+ if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
+ $task_qseq = $jobstepidx;
+ }
+ my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
my $message = "@_ at $file line $line\n";
Log (undef, $message);
freeze() if @jobstep_todo;
- collate_output() if @jobstep_todo;
+ create_output_collection() if @jobstep_todo;
cleanup();
save_meta();
die;
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
return unless log_writer_is_active();
+ my $log_manifest = log_writer_finish();
+ return unless defined($log_manifest);
- my $loglocator = log_writer_finish();
- Log (undef, "log manifest is $loglocator");
- $Job->{'log'} = $loglocator;
- $Job->update_attributes('log', $loglocator);
+ if ($Job->{log}) {
+ my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+ $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
+ }
+
+ my $log_coll = api_call(
+ "collections/create", ensure_unique_name => 1, collection => {
+ manifest_text => $log_manifest,
+ owner_uuid => $Job->{owner_uuid},
+ name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+ });
+ Log(undef, "log collection is " . $log_coll->{portable_data_hash});
+ $Job->update_attributes('log' => $log_coll->{portable_data_hash});
}
}
}
freeze();
- collate_output();
+ create_output_collection();
cleanup();
save_meta();
exit 1;
}
+sub srun_sync
+{
+ my $srunargs = shift;
+ my $execargs = shift;
+ my $opts = shift || {};
+ my $stdin = shift;
+
+ my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
+ Log (undef, "$label: start");
+
+ my ($stderr_r, $stderr_w);
+ pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
+
+ my ($stdout_r, $stdout_w);
+ pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+
+ my $srunpid = fork();
+ if ($srunpid == 0)
+ {
+ close($stderr_r);
+ close($stdout_r);
+ fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+ fcntl($stdout_w, F_SETFL, 0) or croak($!);
+ open(STDERR, ">&", $stderr_w);
+ open(STDOUT, ">&", $stdout_w);
+ srun ($srunargs, $execargs, $opts, $stdin);
+ exit (1);
+ }
+ close($stderr_w);
+ close($stdout_w);
+
+ set_nonblocking($stderr_r);
+ set_nonblocking($stdout_r);
+
+ # Add entries to @jobstep and %proc so check_squeue() and
+ # freeze_if_want_freeze() can treat it like a job task process.
+ push @jobstep, {
+ stderr => '',
+ stderr_at => 0,
+ stderr_captured => '',
+ stdout_r => $stdout_r,
+ stdout_captured => '',
+ };
+ my $jobstepidx = $#jobstep;
+ $proc{$srunpid} = {
+ jobstepidx => $jobstepidx,
+ };
+ $reader{$jobstepidx} = $stderr_r;
+
+ while ($srunpid != waitpid ($srunpid, WNOHANG)) {
+ my $busy = readfrompipes();
+ if (!$busy || ($latest_refresh + 2 < scalar time)) {
+ check_refresh_wanted();
+ check_squeue();
+ }
+ if (!$busy) {
+ select(undef, undef, undef, 0.1);
+ }
+ killem(keys %proc) if $main::please_freeze;
+ }
+ my $exited = $?;
+
+ 1 while readfrompipes();
+ process_stderr_final ($jobstepidx);
+
+ Log (undef, "$label: exit ".exit_status_s($exited));
+
+ close($stdout_r);
+ close($stderr_r);
+ delete $proc{$srunpid};
+ delete $reader{$jobstepidx};
+
+ my $j = pop @jobstep;
+ return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+}
+
+
sub srun
{
my $srunargs = shift;
my $opts = shift || {};
my $stdin = shift;
my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
- print STDERR (join (" ",
- map { / / ? "'$_'" : $_ }
- (@$args)),
- "\n")
- if $ENV{CRUNCH_DEBUG};
+
+ $Data::Dumper::Terse = 1;
+ $Data::Dumper::Indent = 0;
+ my $show_cmd = Dumper($args);
+ $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
+ $show_cmd =~ s/\n/ /g;
+ if ($opts->{fork}) {
+ Log(undef, "starting: $show_cmd");
+ } else {
+ # This is a child process: parent is in charge of reading our
+ # stderr and copying it to Log() if needed.
+ warn "starting: $show_cmd\n";
+ }
if (defined $stdin) {
my $child = open STDIN, "-|";
# If not, return undef for both values.
my $locator = shift;
my ($streamname, $filename);
- my $image = retry_op(sub {
- $arv->{collections}->{get}->execute(uuid => $locator);
- });
+ my $image = api_call("collections/get", uuid => $locator);
if ($image) {
foreach my $line (split(/\n/, $image->{manifest_text})) {
my @tokens = split(/\s+/, $line);
}
sub retry_op {
- # Given a function reference, call it with the remaining arguments. If
- # it dies, retry it with exponential backoff until it succeeds, or until
- # the current retry_count is exhausted.
+ # Pass in two function references.
+ # This method will be called with the remaining arguments.
+ # If it dies, retry it with exponential backoff until it succeeds,
+ # or until the current retry_count is exhausted. After each failure
+ # that can be retried, the second function will be called with
+ # the current try count (0-based), next try time, and error message.
my $operation = shift;
+ my $retry_callback = shift;
my $retries = retry_count();
foreach my $try_count (0..$retries) {
my $next_try = time + (2 ** $try_count);
if (!$@) {
return $result;
} elsif ($try_count < $retries) {
+ $retry_callback->($try_count, $next_try, $@);
my $sleep_time = $next_try - time;
sleep($sleep_time) if ($sleep_time > 0);
}
die($@ . "\n");
}
+sub api_call {
+ # Pass in a /-separated API method name, and arguments for it.
+ # This function will call that method, retrying as needed until
+ # the current retry_count is exhausted, with a log on the first failure.
+ my $method_name = shift;
+ my $log_api_retry = sub {
+ my ($try_count, $next_try_at, $errmsg) = @_;
+ $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+ $errmsg =~ s/\s/ /g;
+ $errmsg =~ s/\s+$//;
+ my $retry_msg;
+ if ($next_try_at < time) {
+ $retry_msg = "Retrying.";
+ } else {
+ my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+ $retry_msg = "Retrying at $next_try_fmt.";
+ }
+ Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
+ };
+ my $method = $arv;
+ foreach my $key (split(/\//, $method_name)) {
+ $method = $method->{$key};
+ }
+ return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+}
+
sub exit_status_s {
# Given a $?, return a human-readable exit code string like "0" or
# "1" or "0 with signal 1" or "1 with signal 11".
return $s;
}
-__DATA__
-#!/usr/bin/perl
+sub handle_readall {
+ # Pass in a glob reference to a file handle.
+ # Read all its contents and return them as a string.
+ my $fh_glob_ref = shift;
+ local $/ = undef;
+ return <$fh_glob_ref>;
+}
-# checkout-and-build
+sub tar_filename_n {
+ my $n = shift;
+ return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
+}
+
+sub add_git_archive {
+ # Pass in a git archive command as a string or list, a la system().
+ # This method will save its output to be included in the archive sent to the
+ # build script.
+ my $git_input;
+ $git_tar_count++;
+ if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
+ croak("Failed to save git archive: $!");
+ }
+ my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
+ close($git_input);
+ waitpid($git_pid, 0);
+ close(GIT_ARCHIVE);
+ if ($?) {
+ croak("Failed to save git archive: git exited " . exit_status_s($?));
+ }
+}
+
+sub combined_git_archive {
+ # Combine all saved tar archives into a single archive, then return its
+ # contents in a string. Return undef if no archives have been saved.
+ if ($git_tar_count < 1) {
+ return undef;
+ }
+ my $base_tar_name = tar_filename_n(1);
+ foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
+ my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
+ if ($tar_exit != 0) {
+ croak("Error preparing build archive: tar -A exited " .
+ exit_status_s($tar_exit));
+ }
+ }
+ if (!open(GIT_TAR, "<", $base_tar_name)) {
+ croak("Could not open build archive: $!");
+ }
+ my $tar_contents = handle_readall(\*GIT_TAR);
+ close(GIT_TAR);
+ return $tar_contents;
+}
+
+sub set_nonblocking {
+ my $fh = shift;
+ my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
+ fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+}
+
+__DATA__
+#!/usr/bin/env perl
+#
+# This is crunch-job's internal dispatch script. crunch-job running on the API
+# server invokes this script on individual compute nodes, or localhost if we're
+# running a job locally. It gets called in two modes:
+#
+# * No arguments: Installation mode. Read a tar archive from the DATA
+# file handle; it includes the Crunch script's source code, and
+# maybe SDKs as well. Those should be installed in the proper
+# locations. This runs outside of any Docker container, so don't try to
+# introspect Crunch's runtime environment.
+#
+# * With arguments: Crunch script run mode. This script should set up the
+# environment, then run the command specified in the arguments. This runs
+# inside any Docker container.
use Fcntl ':flock';
-use File::Path qw( make_path );
+use File::Path qw( make_path remove_tree );
+use POSIX qw(getcwd);
+
+use constant TASK_TEMPFAIL => 111;
+
+# Map SDK subdirectories to the path environments they belong to.
+my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
my $destdir = $ENV{"CRUNCH_SRC"};
-my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
+my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
my $repo = $ENV{"CRUNCH_SRC_URL"};
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
+my $job_work = $ENV{"JOB_WORK"};
my $task_work = $ENV{"TASK_WORK"};
-for my $dir ($destdir, $task_work) {
- if ($dir) {
- make_path $dir;
- -e $dir or die "Failed to create temporary directory ($dir): $!";
- }
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+
+for my $dir ($destdir, $job_work, $task_work) {
+ if ($dir) {
+ make_path $dir;
+ -e $dir or die "Failed to create temporary directory ($dir): $!";
+ }
}
-open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
-flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
- if (@ARGV) {
- exec(@ARGV);
- die "Cannot exec `@ARGV`: $!";
+if ($task_work) {
+ remove_tree($task_work, {keep_root => 1});
+}
+
+### Crunch script run mode
+if (@ARGV) {
+ # We want to do routine logging during task 0 only. This gives the user
+ # the information they need, but avoids repeating the information for every
+ # task.
+ my $Log;
+ if ($ENV{TASK_SEQUENCE} eq "0") {
+ $Log = sub {
+ my $msg = shift;
+ printf STDERR_ORIG "[Crunch] $msg\n", @_;
+ };
+ } else {
+ $Log = sub { };
+ }
+
+ my $python_src = "$install_dir/python";
+ my $venv_dir = "$job_work/.arvados.venv";
+ my $venv_built = -e "$venv_dir/bin/activate";
+ if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
+ shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
+ "--python=python2.7", $venv_dir);
+ shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+ $venv_built = 1;
+ $Log->("Built Python SDK virtualenv");
+ }
+
+ my @pysdk_version_cmd = ("python", "-c",
+ "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
+ if ($venv_built) {
+ $Log->("Running in Python SDK virtualenv");
+ @pysdk_version_cmd = ();
+ my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
+ @ARGV = ("/bin/sh", "-ec",
+ ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
+ } elsif (-d $python_src) {
+ $Log->("Warning: virtualenv not found inside Docker container default " .
+ "\$PATH. Can't install Python SDK.");
+ }
+
+ if (@pysdk_version_cmd) {
+ open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
+ my $pysdk_version = <$pysdk_version_pipe>;
+ close($pysdk_version_pipe);
+ if ($? == 0) {
+ chomp($pysdk_version);
+ $Log->("Using Arvados SDK version $pysdk_version");
} else {
- exit 0;
+ # A lot could've gone wrong here, but pretty much all of it means that
+ # Python won't be able to load the Arvados SDK.
+ $Log->("Warning: Arvados SDK not found");
+ }
+ }
+
+ while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
+ my $sdk_path = "$install_dir/$sdk_dir";
+ if (-d $sdk_path) {
+ if ($ENV{$sdk_envkey}) {
+ $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
+ } else {
+ $ENV{$sdk_envkey} = $sdk_path;
+ }
+ $Log->("Arvados SDK added to %s", $sdk_envkey);
}
+ }
+
+ exec(@ARGV);
+ die "Cannot exec `@ARGV`: $!";
}
-unlink "$destdir.commit";
-open STDOUT, ">", "$destdir.log";
-open STDERR, ">&STDOUT";
+### Installation mode
+open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
+flock L, LOCK_EX;
+if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
+ # This exact git archive (source + arvados sdk) is already installed
+ # here, so there's no need to reinstall it.
+ # We must consume our DATA section, though: otherwise the process
+ # feeding it to us will get SIGPIPE.
+ my $buf;
+ while (read(DATA, $buf, 65536)) { }
+
+ exit(0);
+}
+
+unlink "$destdir.archive_hash";
mkdir $destdir;
-my @git_archive_data = <DATA>;
-if (@git_archive_data) {
- open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
- print TARX @git_archive_data;
+
+do {
+ # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
+ local $SIG{PIPE} = "IGNORE";
+ warn "Extracting archive: $archive_hash\n";
+ # --ignore-zeros is necessary sometimes: depending on how much NUL
+ # padding tar -A put on our combined archive (which in turn depends
+ # on the length of the component archives) tar without
+ # --ignore-zeros will exit before consuming stdin and cause close()
+ # to fail on the resulting SIGPIPE.
+ if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
+ die "Error launching 'tar -xC $destdir': $!";
+ }
+ # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+ # get SIGPIPE. We must feed it data incrementally.
+ my $tar_input;
+ while (read(DATA, $tar_input, 65536)) {
+ print TARX $tar_input;
+ }
if(!close(TARX)) {
- die "'tar -C $destdir -xf -' exited $?: $!";
+ die "'tar -xC $destdir' exited $?: $!";
}
-}
+};
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
mkdir $install_dir;
-for my $src_path ("$destdir/arvados/sdk/python") {
- if (-d $src_path) {
- shell_or_die ("virtualenv", $install_dir);
- shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+my $sdk_root = "$destdir/.arvados.sdk/sdk";
+if (-d $sdk_root) {
+ foreach my $sdk_lang (("python",
+ map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
+ if (-d "$sdk_root/$sdk_lang") {
+ if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
+ die "Failed to install $sdk_lang SDK: $!";
+ }
+ }
}
}
+my $python_dir = "$install_dir/python";
+if ((-d $python_dir) and can_run("python2.7")) {
+ open(my $egg_info_pipe, "-|",
+ "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
+ my @egg_info_errors = <$egg_info_pipe>;
+ close($egg_info_pipe);
+
+ if ($?) {
+ if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
+ # egg_info apparently failed because it couldn't ask git for a build tag.
+ # Specify no build tag.
+ open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
+ print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
+ close($pysdk_cfg);
+ } else {
+ my $egg_info_exit = $? >> 8;
+ foreach my $errline (@egg_info_errors) {
+ warn $errline;
+ }
+ warn "python setup.py egg_info failed: exit $egg_info_exit";
+ exit ($egg_info_exit || 1);
+ }
+ }
+}
+
+# Hide messages from the install script (unless it fails: shell_or_die
+# will show $destdir.log in that case).
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
if (-e "$destdir/crunch_scripts/install") {
- shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
+ shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
# Old version
- shell_or_die ("./tests/autotests.sh", $install_dir);
+ shell_or_die (undef, "./tests/autotests.sh", $install_dir);
} elsif (-e "./install.sh") {
- shell_or_die ("./install.sh", $install_dir);
+ shell_or_die (undef, "./install.sh", $install_dir);
}
-if ($commit) {
- unlink "$destdir.commit.new";
- symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
- rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
+if ($archive_hash) {
+ unlink "$destdir.archive_hash.new";
+ symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
+ rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
}
close L;
-if (@ARGV) {
- exec(@ARGV);
- die "Cannot exec `@ARGV`: $!";
-} else {
- exit 0;
+sub can_run {
+ my $command_name = shift;
+ open(my $which, "-|", "which", $command_name);
+ while (<$which>) { }
+ close($which);
+ return ($? == 0);
}
sub shell_or_die
{
+ my $exitcode = shift;
+
if ($ENV{"DEBUG"}) {
print STDERR "@_\n";
}
- system (@_) == 0
- or die "@_ failed: $! exit 0x".sprintf("%x",$?);
+ if (system (@_) != 0) {
+ my $err = $!;
+ my $code = $?;
+ my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
+ open STDERR, ">&STDERR_ORIG";
+ system ("cat $destdir.log >&2");
+ warn "@_ failed ($err): $exitstatus";
+ if (defined($exitcode)) {
+ exit $exitcode;
+ }
+ else {
+ exit (($code >> 8) || 1);
+ }
+ }
}
__DATA__