my $job_api_token;
my $no_clear_tmp;
my $resume_stash;
+my $cgroup_root = "/sys/fs/cgroup";
my $docker_bin = "docker.io";
+my $docker_run_args = "";
GetOptions('force-unlock' => \$force_unlock,
'git-dir=s' => \$git_dir,
'job=s' => \$jobspec,
'job-api-token=s' => \$job_api_token,
'no-clear-tmp' => \$no_clear_tmp,
'resume-stash=s' => \$resume_stash,
+ 'cgroup-root=s' => \$cgroup_root,
'docker-bin=s' => \$docker_bin,
+ 'docker-run-args=s' => \$docker_run_args,
);
if (defined $job_api_token) {
$cmd = [$docker_bin, 'ps', '-q'];
}
Log(undef, "Sanity check is `@$cmd`");
-srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
- $cmd,
- {fork => 1});
-if ($? != 0) {
- Log(undef, "Sanity check failed: ".exit_status_s($?));
+my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+ $cmd,
+ {label => "sanity check"});
+if ($exited != 0) {
+ Log(undef, "Sanity check failed: ".exit_status_s($exited));
exit EX_TEMPFAIL;
}
Log(undef, "Sanity check OK");
my $git_tar_count = 0;
if (!defined $no_clear_tmp) {
- # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
- Log (undef, "Clean work dirs");
-
- my $cleanpid = fork();
- if ($cleanpid == 0)
- {
- # Find FUSE mounts under $CRUNCH_TMP and unmount them.
- # Then clean up work directories.
- # TODO: When #5036 is done and widely deployed, we can limit mount's
- # -t option to simply fuse.keep.
- srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
- exit (1);
- }
- while (1)
- {
- last if $cleanpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($cleanpid);
- select (undef, undef, undef, 0.1);
- }
- if ($?) {
- Log(undef, "Clean work dirs: exit ".exit_status_s($?));
+ # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
+ # up work directories crunch_tmp/work, crunch_tmp/opt,
+ # crunch_tmp/src*.
+ #
+ # TODO: When #5036 is done and widely deployed, we can limit mount's
+ # -t option to simply fuse.keep.
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+ ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+ {label => "clean work dirs"});
+ if ($exited != 0) {
exit(EX_RETRY_UNLOCKED);
}
}
# If this job requires a Docker image, install that.
-my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
if ($docker_locator = $Job->{docker_image_locator}) {
+ Log (undef, "Install docker image $docker_locator");
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
if (!$docker_hash)
{
croak("No Docker image hash found from locator $docker_locator");
}
+ Log (undef, "docker image hash is $docker_hash");
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
- arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+if $docker_bin images -q --no-trunc --all | grep -xF \Q$docker_hash\E >/dev/null; then
+ exit 0
+fi
+declare -a exit_codes=("\${PIPESTATUS[@]}")
+if [ 0 != "\${exit_codes[0]}" ]; then
+ exit "\${exit_codes[0]}" # `docker images` failed
+elif [ 1 != "\${exit_codes[1]}" ]; then
+ exit "\${exit_codes[1]}" # `grep` encountered an error
+else
+ # Everything worked fine, but grep didn't find the image on this host.
+ arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
fi
};
- my $docker_pid = fork();
- if ($docker_pid == 0)
- {
- srun (["srun", "--nodelist=" . join(',', @node)],
- ["/bin/sh", "-ec", $docker_install_script]);
- exit ($?);
- }
- while (1)
- {
- last if $docker_pid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($docker_pid);
- select (undef, undef, undef, 0.1);
- }
- if ($? != 0)
+
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . join(',', @node)],
+ ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
+ {label => "load docker image"});
+ if ($exited != 0)
{
- croak("Installing Docker image from $docker_locator exited "
- .exit_status_s($?));
+ exit(EX_RETRY_UNLOCKED);
}
# Determine whether this version of Docker supports memory+swap limits.
- srun(["srun", "--nodelist=" . $node[0]],
- ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
- {fork => 1});
- $docker_limitmem = ($? == 0);
+ ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=1"],
+ [$docker_bin, 'run', '--help'],
+ {label => "check --memory-swap feature"});
+ $docker_limitmem = ($stdout =~ /--memory-swap/);
+
+ # Find a non-root Docker user to use.
+ # Tries the default user for the container, then 'crunch', then 'nobody',
+ # testing for whether the actual user id is non-zero. This defends against
+ # mistakes but not malice, but we intend to harden the security in the future
+ # so we don't want anyone getting used to their jobs running as root in their
+ # Docker containers.
+ my @tryusers = ("", "crunch", "nobody");
+ foreach my $try_user (@tryusers) {
+ my $label;
+ my $try_user_arg;
+ if ($try_user eq "") {
+ $label = "check whether default user is UID 0";
+ $try_user_arg = "";
+ } else {
+ $label = "check whether user '$try_user' is UID 0";
+ $try_user_arg = "--user=$try_user";
+ }
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=1"],
+ ["/bin/sh", "-ec",
+ "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
+ {label => $label});
+ chomp($stdout);
+ if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
+ $dockeruserarg = $try_user_arg;
+ if ($try_user eq "") {
+ Log(undef, "Container will run with default user");
+ } else {
+ Log(undef, "Container will run with $dockeruserarg");
+ }
+ last;
+ }
+ }
+
+ if (!defined $dockeruserarg) {
+ croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
+ }
if ($Job->{arvados_sdk_version}) {
# The job also specifies an Arvados SDK version. Add the SDKs to the
}
}
else {
- my $install_exited;
+ my $exited;
my $install_script_tries_left = 3;
for (my $attempts = 0; $attempts < 3; $attempts++) {
- Log(undef, "Run install script on all workers");
-
my @srunargs = ("srun",
"--nodelist=$nodelist",
"-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
$ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
- my ($install_stderr_r, $install_stderr_w);
- pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
- set_nonblocking($install_stderr_r);
- my $installpid = fork();
- if ($installpid == 0)
- {
- close($install_stderr_r);
- fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
- open(STDOUT, ">&", $install_stderr_w);
- open(STDERR, ">&", $install_stderr_w);
- srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
- exit (1);
- }
- close($install_stderr_w);
- # Tell freeze_if_want_freeze how to kill the child, otherwise the
- # "waitpid(installpid)" loop won't get interrupted by a freeze:
- $proc{$installpid} = {};
- my $stderr_buf = '';
- # Track whether anything appears on stderr other than slurm errors
- # ("srun: ...") and the "starting: ..." message printed by the
- # srun subroutine itself:
+ my ($stdout, $stderr);
+ ($exited, $stdout, $stderr) = srun_sync(
+ \@srunargs, \@execargs,
+ {label => "run install script on all workers"},
+ $build_script . $git_archive);
+
my $stderr_anything_from_script = 0;
- my $match_our_own_errors = '^(srun: error: |starting: \[)';
- while ($installpid != waitpid(-1, WNOHANG)) {
- freeze_if_want_freeze ($installpid);
- # Wait up to 0.1 seconds for something to appear on stderr, then
- # do a non-blocking read.
- my $bits = fhbits($install_stderr_r);
- select ($bits, undef, $bits, 0.1);
- if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
- {
- while ($stderr_buf =~ /^(.*?)\n/) {
- my $line = $1;
- substr $stderr_buf, 0, 1+length($line), "";
- Log(undef, "stderr $line");
- if ($line !~ /$match_our_own_errors/) {
- $stderr_anything_from_script = 1;
- }
- }
- }
- }
- delete $proc{$installpid};
- $install_exited = $?;
- close($install_stderr_r);
- if (length($stderr_buf) > 0) {
- if ($stderr_buf !~ /$match_our_own_errors/) {
+ for my $line (split(/\n/, $stderr)) {
+ if ($line !~ /^(srun: error: |starting: \[)/) {
$stderr_anything_from_script = 1;
}
- Log(undef, "stderr $stderr_buf")
}
- Log (undef, "Install script exited ".exit_status_s($install_exited));
- last if $install_exited == 0 || $main::please_freeze;
+ last if $exited == 0 || $main::please_freeze;
+
# If the install script fails but doesn't print an error message,
# the next thing anyone is likely to do is just run it again in
# case it was a transient problem like "slurm communication fails
unlink($tar_filename);
}
- if ($install_exited != 0) {
+ if ($exited != 0) {
croak("Giving up");
}
}
@freeslot = (0..$#slot);
}
my $round_num_freeslots = scalar(@freeslot);
+print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
my %round_max_slots = ();
for (my $ii = $#freeslot; $ii >= 0; $ii--) {
THISROUND:
for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
+ # Don't create new tasks if we already know the job's final result.
+ last if defined($main::success);
+
my $id = $jobstep_todo[$todo_ptr];
my $Jobstep = $jobstep[$id];
if ($Jobstep->{level} != $level)
$ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
$ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
$ENV{"HOME"} = $ENV{"TASK_WORK"};
- $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
$ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
$ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
+ my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
+
$ENV{"GZIP"} = "-n";
my @srunargs = (
qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
"--job-name=$job_id.$id.$$",
);
+
+ my $stdbuf = " stdbuf --output=0 --error=0 ";
+
+ my $arv_file_cache = "";
+ if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
+ $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
+ }
+
my $command =
- "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
- ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
- ."&& cd $ENV{CRUNCH_TMP} "
+ "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
+ ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
+ ."&& cd \Q$ENV{CRUNCH_TMP}\E "
# These environment variables get used explicitly later in
# $command. No tool is expected to read these values directly.
.q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
.q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
- ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
- $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+ ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
+ .q{&& declare -a VOLUMES=() }
+ .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner") ; fi }
+ .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt") ; }
+ .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt") ; fi };
+
+ $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
+ $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
+ $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
+
if ($docker_hash)
{
- my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
- $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+ my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
+ my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
+ $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+ $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
# We only set memory limits if Docker lets us limit both memory and swap.
# Memory limits alone have been supported longer, but subprocesses tend
# to get SIGKILL if they exceed that without any swap limit set.
$command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
$command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
- # Currently, we make arv-mount's mount point appear at /keep
- # inside the container (instead of using the same path as the
- # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
- # crunch scripts and utilities must not rely on this. They must
- # use $TASK_KEEPMOUNT.
+ # Currently, we make the "by_pdh" directory in arv-mount's mount
+ # point appear at /keep inside the container (instead of using
+ # the same path as the host like we do with CRUNCH_SRC and
+ # CRUNCH_INSTALL). However, crunch scripts and utilities must
+ # not rely on this. They must use $TASK_KEEPMOUNT.
$command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
$ENV{TASK_KEEPMOUNT} = "/keep";
+ # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
+ $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
+ $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
+
# TASK_WORK is almost exactly like a docker data volume: it
# starts out empty, is writable, and persists until no
# containers use it any more. We don't use --volumes-from to
# For now, use the same approach as TASK_WORK above.
$ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
+ # Bind mount the crunchrunner binary and host TLS certificates file into
+ # the container.
+ $command .= '"${VOLUMES[@]}" ';
+
while (my ($env_key, $env_val) = each %ENV)
{
if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
}
$command .= "--env=\QHOME=$ENV{HOME}\E ";
$command .= "\Q$docker_hash\E ";
- $command .= "stdbuf --output=0 --error=0 ";
- $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+
+ if ($Job->{arvados_sdk_version}) {
+ $command .= $stdbuf;
+ $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
+ } else {
+ $command .= "/bin/sh -c \'python -c " .
+ '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
+ ">&2 2>/dev/null; " .
+ "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+ "if which stdbuf >/dev/null ; then " .
+ " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " else " .
+ " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " fi\'";
+ }
} else {
# Non-docker run
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
- $command .= "stdbuf --output=0 --error=0 ";
+ $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
+ $command .= $stdbuf;
$command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
}
next;
}
shift @freeslot;
- $proc{$childpid} = { jobstep => $id,
- time => time,
- slot => $childslot,
- jobstepname => "$job_id.$id.$childpid",
- };
+ $proc{$childpid} = {
+ jobstepidx => $id,
+ time => time,
+ slot => $childslot,
+ jobstepname => "$job_id.$id.$childpid",
+ };
croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
$slot[$childslot]->{pid} = $childpid;
||
($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
- last THISROUND if $main::please_freeze || defined($main::success);
+ last THISROUND if $main::please_freeze;
if ($main::please_info)
{
$main::please_info = 0;
my $gotsome
= readfrompipes ()
+ reapchildren ();
- if (!$gotsome)
+ if (!$gotsome || ($latest_refresh + 2 < scalar time))
{
check_refresh_wanted();
check_squeue();
update_progress_stats();
- select (undef, undef, undef, 0.1);
}
elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
{
update_progress_stats();
}
+ if (!$gotsome) {
+ select (undef, undef, undef, 0.1);
+ }
$working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
$_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
sub reapchildren
{
- my $pid = waitpid (-1, WNOHANG);
- return 0 if $pid <= 0;
-
- my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
- . "."
- . $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepid = $proc{$pid}->{jobstep};
- my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepid];
-
- my $childstatus = $?;
- my $exitvalue = $childstatus >> 8;
- my $exitinfo = "exit ".exit_status_s($childstatus);
- $Jobstep->{'arvados_task'}->reload;
- my $task_success = $Jobstep->{'arvados_task'}->{success};
-
- Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
-
- if (!defined $task_success) {
- # task did not indicate one way or the other --> fail
- $Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
- $task_success = 0;
- }
+ my $children_reaped = 0;
+ my @successful_task_uuids = ();
- if (!$task_success)
+ while((my $pid = waitpid (-1, WNOHANG)) > 0)
{
- my $temporary_fail;
- $temporary_fail ||= $Jobstep->{tempfail};
- $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-
- ++$thisround_failed;
- ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-
- # Check for signs of a failed or misconfigured node
- if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
- 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
- # Don't count this against jobstep failure thresholds if this
- # node is already suspected faulty and srun exited quickly
- if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
- $elapsed < 5) {
- Log ($jobstepid, "blaming failure on suspect node " .
- $slot[$proc{$pid}->{slot}]->{node}->{name});
- $temporary_fail ||= 1;
- }
- ban_node_by_slot($proc{$pid}->{slot});
+ my $childstatus = $?;
+
+ my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+ . "."
+ . $slot[$proc{$pid}->{slot}]->{cpu});
+ my $jobstepidx = $proc{$pid}->{jobstepidx};
+
+ $children_reaped++;
+ my $elapsed = time - $proc{$pid}->{time};
+ my $Jobstep = $jobstep[$jobstepidx];
+
+ my $exitvalue = $childstatus >> 8;
+ my $exitinfo = "exit ".exit_status_s($childstatus);
+ $Jobstep->{'arvados_task'}->reload;
+ my $task_success = $Jobstep->{'arvados_task'}->{success};
+
+ Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
+
+ if (!defined $task_success) {
+ # task did not indicate one way or the other --> fail
+ Log($jobstepidx, sprintf(
+ "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+ exit_status_s($childstatus)));
+ $Jobstep->{'arvados_task'}->{success} = 0;
+ $Jobstep->{'arvados_task'}->save;
+ $task_success = 0;
}
- Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
- ++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary' : 'permanent',
- $elapsed));
+ if (!$task_success)
+ {
+ my $temporary_fail;
+ $temporary_fail ||= $Jobstep->{tempfail};
+ $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+
+ ++$thisround_failed;
+ ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+
+ # Check for signs of a failed or misconfigured node
+ if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+ 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+ # Don't count this against jobstep failure thresholds if this
+ # node is already suspected faulty and srun exited quickly
+ if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+ $elapsed < 5) {
+ Log ($jobstepidx, "blaming failure on suspect node " .
+ $slot[$proc{$pid}->{slot}]->{node}->{name});
+ $temporary_fail ||= 1;
+ }
+ ban_node_by_slot($proc{$pid}->{slot});
+ }
+
+ Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
+ ++$Jobstep->{'failures'},
+ $temporary_fail ? 'temporary' : 'permanent',
+ $elapsed));
- if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
- # Give up on this task, and the whole job
- $main::success = 0;
+ if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+ # Give up on this task, and the whole job
+ $main::success = 0;
+ }
+ # Put this task back on the todo queue
+ push @jobstep_todo, $jobstepidx;
+ $Job->{'tasks_summary'}->{'failed'}++;
}
- # Put this task back on the todo queue
- push @jobstep_todo, $jobstepid;
- $Job->{'tasks_summary'}->{'failed'}++;
+ else # task_success
+ {
+ push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
+ ++$thisround_succeeded;
+ $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
+ push @jobstep_done, $jobstepidx;
+ Log ($jobstepidx, "success in $elapsed seconds");
+ }
+ $Jobstep->{exitcode} = $childstatus;
+ $Jobstep->{finishtime} = time;
+ $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+ $Jobstep->{'arvados_task'}->save;
+ process_stderr_final ($jobstepidx);
+ Log ($jobstepidx, sprintf("task output (%d bytes): %s",
+ length($Jobstep->{'arvados_task'}->{output}),
+ $Jobstep->{'arvados_task'}->{output}));
+
+ close $reader{$jobstepidx};
+ delete $reader{$jobstepidx};
+ delete $slot[$proc{$pid}->{slot}]->{pid};
+ push @freeslot, $proc{$pid}->{slot};
+ delete $proc{$pid};
+
+ $progress_is_dirty = 1;
}
- else
+
+ if (scalar(@successful_task_uuids) > 0)
{
- ++$thisround_succeeded;
- $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
- push @jobstep_done, $jobstepid;
- Log ($jobstepid, "success in $elapsed seconds");
- }
- $Jobstep->{exitcode} = $childstatus;
- $Jobstep->{finishtime} = time;
- $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
- process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, sprintf("task output (%d bytes): %s",
- length($Jobstep->{'arvados_task'}->{output}),
- $Jobstep->{'arvados_task'}->{output}));
-
- close $reader{$jobstepid};
- delete $reader{$jobstepid};
- delete $slot[$proc{$pid}->{slot}]->{pid};
- push @freeslot, $proc{$pid}->{slot};
- delete $proc{$pid};
-
- if ($task_success) {
+ Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
# Load new tasks
my $newtask_list = [];
my $newtask_results;
do {
$newtask_results = api_call(
"job_tasks/list",
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
+ 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
'order' => 'qsequence',
'offset' => scalar(@$newtask_list),
- );
+ );
push(@$newtask_list, @{$newtask_results->{items}});
} while (@{$newtask_results->{items}});
+ Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
foreach my $arvados_task (@$newtask_list) {
my $jobstep = {
'level' => $arvados_task->{'sequence'},
}
}
- $progress_is_dirty = 1;
- 1;
+ return $children_reaped;
}
sub check_refresh_wanted
{
my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
- if (@stat && $stat[9] > $latest_refresh) {
+ if (@stat &&
+ $stat[9] > $latest_refresh &&
+ # ...and we have actually locked the job record...
+ $job_id eq $Job->{'uuid'}) {
$latest_refresh = scalar time;
my $Job2 = api_call("jobs/get", uuid => $jobspec);
for my $attr ('cancelled_at',
# squeue check interval (15s) this should make the squeue check an
# infrequent event.
my $silent_procs = 0;
- for my $jobstep (values %proc)
+ for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
{
- if ($jobstep->{stderr_at} < $last_squeue_check)
+ if (!exists($js->{stderr_at}))
+ {
+ $js->{stderr_at} = 0;
+ }
+ if ($js->{stderr_at} < $last_squeue_check)
{
$silent_procs++;
}
return if $silent_procs == 0;
# use killem() on procs whose killtime is reached
- while (my ($pid, $jobstep) = each %proc)
+ while (my ($pid, $procinfo) = each %proc)
{
- if (exists $jobstep->{killtime}
- && $jobstep->{killtime} <= time
- && $jobstep->{stderr_at} < $last_squeue_check)
+ my $js = $jobstep[$procinfo->{jobstepidx}];
+ if (exists $procinfo->{killtime}
+ && $procinfo->{killtime} <= time
+ && $js->{stderr_at} < $last_squeue_check)
{
my $sincewhen = "";
- if ($jobstep->{stderr_at}) {
- $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+ if ($js->{stderr_at}) {
+ $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
}
- Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
killem ($pid);
}
}
}
# Check for child procs >60s old and not mentioned by squeue.
- while (my ($pid, $jobstep) = each %proc)
+ while (my ($pid, $procinfo) = each %proc)
{
- if ($jobstep->{time} < time - 60
- && $jobstep->{jobstepname}
- && !exists $ok{$jobstep->{jobstepname}}
- && !exists $jobstep->{killtime})
+ if ($procinfo->{time} < time - 60
+ && $procinfo->{jobstepname}
+ && !exists $ok{$procinfo->{jobstepname}}
+ && !exists $procinfo->{killtime})
{
# According to slurm, this task has ended (successfully or not)
# -- but our srun child hasn't exited. First we must wait (30
# terminated, we'll conclude some slurm communication
# error/delay has caused the task to die without notifying srun,
# and we'll kill srun ourselves.
- $jobstep->{killtime} = time + 30;
- Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+ $procinfo->{killtime} = time + 30;
+ Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
sub readfrompipes
{
my $gotsome = 0;
- foreach my $job (keys %reader)
+ my %fd_job;
+ my $sel = IO::Select->new();
+ foreach my $jobstepidx (keys %reader)
+ {
+ my $fd = $reader{$jobstepidx};
+ $sel->add($fd);
+ $fd_job{$fd} = $jobstepidx;
+
+ if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
+ $sel->add($stdout_fd);
+ $fd_job{$stdout_fd} = $jobstepidx;
+ }
+ }
+ # select on all reader fds with 0.1s timeout
+ my @ready_fds = $sel->can_read(0.1);
+ foreach my $fd (@ready_fds)
{
my $buf;
- while (0 < sysread ($reader{$job}, $buf, 8192))
+ if (0 < sysread ($fd, $buf, 65536))
{
+ $gotsome = 1;
print STDERR $buf if $ENV{CRUNCH_DEBUG};
- $jobstep[$job]->{stderr_at} = time;
- $jobstep[$job]->{stderr} .= $buf;
- preprocess_stderr ($job);
- if (length ($jobstep[$job]->{stderr}) > 16384)
+
+ my $jobstepidx = $fd_job{$fd};
+ if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
+ $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+ next;
+ }
+
+ $jobstep[$jobstepidx]->{stderr_at} = time;
+ $jobstep[$jobstepidx]->{stderr} .= $buf;
+
+ # Consume everything up to the last \n
+ preprocess_stderr ($jobstepidx);
+
+ if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
{
- substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+ # If we get a lot of stderr without a newline, chop off the
+ # front to avoid letting our buffer grow indefinitely.
+ substr ($jobstep[$jobstepidx]->{stderr},
+ 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
}
- $gotsome = 1;
}
}
return $gotsome;
}
+# Consume all full lines of stderr for a jobstep. Everything after the
+# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
+# returning.
sub preprocess_stderr
{
- my $job = shift;
+ my $jobstepidx = shift;
+ # slotindex is only defined for children running Arvados job tasks.
+ # Be prepared to handle the undef case (for setup srun calls, etc.).
+ my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
- while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+ while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
my $line = $1;
- substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
- Log ($job, "stderr $line");
+ substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
+ Log ($jobstepidx, "stderr $line");
if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /srun: error: Node failure on/) {
- my $job_slot_index = $jobstep[$job]->{slotindex};
- $slot[$job_slot_index]->{node}->{fail_count}++;
- $jobstep[$job]->{tempfail} = 1;
- ban_node_by_slot($job_slot_index);
+ elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
+ $jobstep[$jobstepidx]->{tempfail} = 1;
+ if (defined($job_slot_index)) {
+ $slot[$job_slot_index]->{node}->{fail_count}++;
+ ban_node_by_slot($job_slot_index);
+ }
}
elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
- $jobstep[$job]->{tempfail} = 1;
- ban_node_by_slot($jobstep[$job]->{slotindex});
+ $jobstep[$jobstepidx]->{tempfail} = 1;
+ ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
}
- elsif ($line =~ /arvados\.errors\.Keep/) {
- $jobstep[$job]->{tempfail} = 1;
+ elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
+ $jobstep[$jobstepidx]->{tempfail} = 1;
}
}
}
-sub process_stderr
+sub process_stderr_final
{
- my $job = shift;
- my $task_success = shift;
- preprocess_stderr ($job);
+ my $jobstepidx = shift;
+ preprocess_stderr ($jobstepidx);
map {
- Log ($job, "stderr $_");
- } split ("\n", $jobstep[$job]->{stderr});
+ Log ($jobstepidx, "stderr $_");
+ } split ("\n", $jobstep[$jobstepidx]->{stderr});
+ $jobstep[$jobstepidx]->{stderr} = '';
}
sub fetch_block
}
if (!exists $proc{$_}->{"sent_$sig"})
{
- Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+ Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
kill $sig, $_;
select (undef, undef, undef, 0.1);
if ($sig == 2)
close($log_pipe_in);
+ my $logger_failed = 0;
my $read_result = log_writer_read_output(120);
if ($read_result == -1) {
+ $logger_failed = -1;
Log (undef, "timed out reading from 'arv-put'");
} elsif ($read_result != 0) {
+ $logger_failed = -2;
Log(undef, "failed to read arv-put log manifest to EOF");
}
waitpid($log_pipe_pid, 0);
if ($?) {
+ $logger_failed ||= $?;
Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
}
close($log_pipe_out);
- my $arv_put_output = $log_pipe_out_buf;
+ my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
$log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
$log_pipe_out_select = undef;
return $log_pipe_pid;
}
-sub Log # ($jobstep_id, $logmessage)
+sub Log # ($jobstepidx, $logmessage)
{
- if ($_[1] =~ /\n/) {
+ my ($jobstepidx, $logmessage) = @_;
+ if ($logmessage =~ /\n/) {
for my $line (split (/\n/, $_[1])) {
- Log ($_[0], $line);
+ Log ($jobstepidx, $line);
}
return;
}
my $fh = select STDERR; $|=1; select $fh;
- my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+ my $task_qseq = '';
+ if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
+ $task_qseq = $jobstepidx;
+ }
+ my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
return unless log_writer_is_active();
+ my $log_manifest = log_writer_finish();
+ return unless defined($log_manifest);
- my $log_manifest = "";
if ($Job->{log}) {
my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
- $log_manifest .= $prev_log_coll->{manifest_text};
+ $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
}
- $log_manifest .= log_writer_finish();
my $log_coll = api_call(
"collections/create", ensure_unique_name => 1, collection => {
}
+sub srun_sync
+{
+ my $srunargs = shift;
+ my $execargs = shift;
+ my $opts = shift || {};
+ my $stdin = shift;
+
+ my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
+ Log (undef, "$label: start");
+
+ my ($stderr_r, $stderr_w);
+ pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
+
+ my ($stdout_r, $stdout_w);
+ pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+
+ my $srunpid = fork();
+ if ($srunpid == 0)
+ {
+ close($stderr_r);
+ close($stdout_r);
+ fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+ fcntl($stdout_w, F_SETFL, 0) or croak($!);
+ open(STDERR, ">&", $stderr_w);
+ open(STDOUT, ">&", $stdout_w);
+ srun ($srunargs, $execargs, $opts, $stdin);
+ exit (1);
+ }
+ close($stderr_w);
+ close($stdout_w);
+
+ set_nonblocking($stderr_r);
+ set_nonblocking($stdout_r);
+
+ # Add entries to @jobstep and %proc so check_squeue() and
+ # freeze_if_want_freeze() can treat it like a job task process.
+ push @jobstep, {
+ stderr => '',
+ stderr_at => 0,
+ stderr_captured => '',
+ stdout_r => $stdout_r,
+ stdout_captured => '',
+ };
+ my $jobstepidx = $#jobstep;
+ $proc{$srunpid} = {
+ jobstepidx => $jobstepidx,
+ };
+ $reader{$jobstepidx} = $stderr_r;
+
+ while ($srunpid != waitpid ($srunpid, WNOHANG)) {
+ my $busy = readfrompipes();
+ if (!$busy || ($latest_refresh + 2 < scalar time)) {
+ check_refresh_wanted();
+ check_squeue();
+ }
+ if (!$busy) {
+ select(undef, undef, undef, 0.1);
+ }
+ killem(keys %proc) if $main::please_freeze;
+ }
+ my $exited = $?;
+
+ 1 while readfrompipes();
+ process_stderr_final ($jobstepidx);
+
+ Log (undef, "$label: exit ".exit_status_s($exited));
+
+ close($stdout_r);
+ close($stderr_r);
+ delete $proc{$srunpid};
+ delete $reader{$jobstepidx};
+
+ my $j = pop @jobstep;
+ # If the srun showed signs of tempfail, ensure the caller treats that as a
+ # failure case.
+ if ($main::please_freeze || $j->{tempfail}) {
+ $exited ||= 255;
+ }
+ return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+}
+
+
sub srun
{
my $srunargs = shift;
$Log->("Built Python SDK virtualenv");
}
- my $pip_bin = "pip";
+ my @pysdk_version_cmd = ("python", "-c",
+ "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
if ($venv_built) {
$Log->("Running in Python SDK virtualenv");
- $pip_bin = "$venv_dir/bin/pip";
+ @pysdk_version_cmd = ();
my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
@ARGV = ("/bin/sh", "-ec",
". \Q$venv_dir/bin/activate\E; exec $orig_argv");
"\$PATH. Can't install Python SDK.");
}
- my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
- if ($pkgs) {
- $Log->("Using Arvados SDK:");
- foreach my $line (split /\n/, $pkgs) {
- $Log->($line);
+ if (@pysdk_version_cmd) {
+ open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
+ my $pysdk_version = <$pysdk_version_pipe>;
+ close($pysdk_version_pipe);
+ if ($? == 0) {
+ chomp($pysdk_version);
+ $Log->("Using Arvados SDK version $pysdk_version");
+ } else {
+ # A lot could've gone wrong here, but pretty much all of it means that
+ # Python won't be able to load the Arvados SDK.
+ $Log->("Warning: Arvados SDK not found");
}
- } else {
- $Log->("Arvados SDK packages not found");
}
while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {