use Fcntl ':flock';
use File::Path qw( make_path remove_tree );
+use constant TASK_TEMPFAIL => 111;
use constant EX_TEMPFAIL => 75;
$ENV{"TMPDIR"} ||= "/tmp";
# TODO: When #5036 is done and widely deployed, we can get rid of the
# regular expression and just unmount everything with type fuse.keep.
srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
+ ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
exit (1);
}
while (1)
# If this job requires a Docker image, install that.
my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
if ($docker_locator = $Job->{docker_image_locator}) {
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
if (!$docker_hash)
}
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
fi
};
.exit_status_s($?));
}
+ # Determine whether this version of Docker supports memory+swap limits.
+ srun(["srun", "--nodelist=" . $node[0]],
+ ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
+ {fork => 1});
+ $docker_limitmem = ($? == 0);
+
if ($Job->{arvados_sdk_version}) {
# The job also specifies an Arvados SDK version. Add the SDKs to the
# tar file for the build script to install.
@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
or $a <=> $b } @jobstep_todo;
my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
+my $initial_tasks_this_level = 0;
+foreach my $id (@jobstep_todo) {
+ $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
+}
+# If the number of tasks scheduled at this level #T is smaller than the number
+# of slots available #S, only use the first #T slots, or the first slot on
+# each node, whichever number is greater.
+#
+# When we dispatch tasks later, we'll allocate whole-node resources like RAM
+# based on these numbers. Using fewer slots makes more resources available
+# to each individual task, which should normally be a better strategy when
+# there are fewer of them running with less parallelism.
+#
+# Note that this calculation is not redone if the initial tasks at
+# this level queue more tasks at the same level. This may harm
+# overall task throughput for that level.
+my @freeslot;
+if ($initial_tasks_this_level < @node) {
+ @freeslot = (0..$#node);
+} elsif ($initial_tasks_this_level < @slot) {
+ @freeslot = (0..$initial_tasks_this_level - 1);
+} else {
+ @freeslot = (0..$#slot);
+}
+my $round_num_freeslots = scalar(@freeslot);
+
+my %round_max_slots = ();
+for (my $ii = $#freeslot; $ii >= 0; $ii--) {
+ my $this_slot = $slot[$freeslot[$ii]];
+ my $node_name = $this_slot->{node}->{name};
+ $round_max_slots{$node_name} ||= $this_slot->{cpu};
+ last if (scalar(keys(%round_max_slots)) >= @node);
+}
+Log(undef, "start level $level with $round_num_freeslots slots");
my %proc;
-my @freeslot = (0..$#slot);
my @holdslot;
my %reader;
my $progress_is_dirty = 1;
update_progress_stats();
-
THISROUND:
for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
my $childslotname = join (".",
$slot[$childslot]->{node}->{name},
$slot[$childslot]->{cpu});
+
my $childpid = fork();
if ($childpid == 0)
{
$ENV{"HOME"} = $ENV{"TASK_WORK"};
$ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
- $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+ $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
$ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
$ENV{"GZIP"} = "-n";
my $command =
"if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
- ."&& cd $ENV{CRUNCH_TMP} ";
+ ."&& cd $ENV{CRUNCH_TMP} "
+ # These environment variables get used explicitly later in
+ # $command. No tool is expected to read these values directly.
+ .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
+ .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
+ ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
+ ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
$command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_hash)
{
- $Jobstep->{cidfile} = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}-$Jobstep->{failures}.cid";
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$Jobstep->{cidfile} -poll=10000 ";
- $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$Jobstep->{cidfile} --sig-proxy ";
+ my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
+ $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+ $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+ # We only set memory limits if Docker lets us limit both memory and swap.
+ # Memory limits alone have been supported longer, but subprocesses tend
+ # to get SIGKILL if they exceed that without any swap limit set.
+ # See #5642 for additional background.
+ if ($docker_limitmem) {
+ $command .= "--memory=\${MEMLIMIT}k --memory-swap=\${SWAPLIMIT}k ";
+ }
# Dynamically configure the container to use the host system as its
# DNS server. Get the host's global addresses from the ip command,
while (!@freeslot
||
- (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+ ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
last THISROUND if $main::please_freeze || defined($main::success);
if ($main::please_info)
{
my $temporary_fail;
$temporary_fail ||= $Jobstep->{node_fail};
- $temporary_fail ||= ($exitvalue == 111);
+ $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
++$thisround_failed;
++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
push @freeslot, $proc{$pid}->{slot};
delete $proc{$pid};
- if (defined($Jobstep->{cidfile})) {
- unlink $Jobstep->{cidfile};
- delete $Jobstep->{cidfile};
- }
-
if ($task_success) {
# Load new tasks
my $newtask_list = [];
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
+ elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
$jobstep[$job]->{node_fail} = 1;
ban_node_by_slot($jobstep[$job]->{slotindex});
}
use File::Path qw( make_path remove_tree );
use POSIX qw(getcwd);
+use constant TASK_TEMPFAIL => 111;
+
# Map SDK subdirectories to the path environments they belong to.
my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
my $venv_dir = "$job_work/.arvados.venv";
my $venv_built = -e "$venv_dir/bin/activate";
if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
- shell_or_die("virtualenv", "--quiet", "--system-site-packages",
+ shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
"--python=python2.7", $venv_dir);
- shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+ shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
$venv_built = 1;
$Log->("Built Python SDK virtualenv");
}
}
if (-e "$destdir/crunch_scripts/install") {
- shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
+ shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
# Old version
- shell_or_die ("./tests/autotests.sh", $install_dir);
+ shell_or_die (undef, "./tests/autotests.sh", $install_dir);
} elsif (-e "./install.sh") {
- shell_or_die ("./install.sh", $install_dir);
+ shell_or_die (undef, "./install.sh", $install_dir);
}
if ($commit) {
sub shell_or_die
{
+ my $exitcode = shift;
+
if ($ENV{"DEBUG"}) {
print STDERR "@_\n";
}
if (system (@_) != 0) {
my $err = $!;
- my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
+ my $code = $?;
+ my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
open STDERR, ">&STDERR_ORIG";
system ("cat $destdir.log >&2");
- die "@_ failed ($err): $exitstatus";
+ warn "@_ failed ($err): $exitstatus";
+ if (defined($exitcode)) {
+ exit $exitcode;
+ }
+ else {
+ exit (($code >> 8) || 1);
+ }
}
}