my $job_api_token;
my $no_clear_tmp;
my $resume_stash;
+my $cgroup_root = "/sys/fs/cgroup";
my $docker_bin = "docker.io";
my $docker_run_args = "";
GetOptions('force-unlock' => \$force_unlock,
'job-api-token=s' => \$job_api_token,
'no-clear-tmp' => \$no_clear_tmp,
'resume-stash=s' => \$resume_stash,
+ 'cgroup-root=s' => \$cgroup_root,
'docker-bin=s' => \$docker_bin,
'docker-run-args=s' => \$docker_run_args,
);
my @jobstep_tomerge = ();
my $jobstep_tomerge_level = 0;
my $squeue_checked = 0;
+my $sinfo_checked = 0;
my $latest_refresh = scalar time;
Log (undef, "docker image hash is $docker_hash");
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
- arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+if $docker_bin images -q --no-trunc --all | grep -xF \Q$docker_hash\E >/dev/null; then
+ exit 0
+fi
+declare -a exit_codes=("\${PIPESTATUS[@]}")
+if [ 0 != "\${exit_codes[0]}" ]; then
+ exit "\${exit_codes[0]}" # `docker images` failed
+elif [ 1 != "\${exit_codes[1]}" ]; then
+ exit "\${exit_codes[1]}" # `grep` encountered an error
+else
+ # Everything worked fine, but grep didn't find the image on this host.
+ arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
fi
};
# Determine whether this version of Docker supports memory+swap limits.
($exited, $stdout, $stderr) = srun_sync(
- ["srun", "--nodelist=" . $node[0]],
+ ["srun", "--nodes=1"],
[$docker_bin, 'run', '--help'],
{label => "check --memory-swap feature"});
$docker_limitmem = ($stdout =~ /--memory-swap/);
$try_user_arg = "--user=$try_user";
}
my ($exited, $stdout, $stderr) = srun_sync(
- ["srun", "--nodelist=" . $node[0]],
+ ["srun", "--nodes=1"],
["/bin/sh", "-ec",
"$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
{label => $label});
.q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
.q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
- ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
+ ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
+ .q{&& declare -a VOLUMES=() }
+ .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner") ; fi }
+ .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt") ; }
+ .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt") ; fi };
$command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
$ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
{
my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+ $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
$command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
# We only set memory limits if Docker lets us limit both memory and swap.
# Memory limits alone have been supported longer, but subprocesses tend
# For now, use the same approach as TASK_WORK above.
$ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
+ # Bind mount the crunchrunner binary and host TLS certificates file into
+ # the container.
+ $command .= '"${VOLUMES[@]}" ';
+
while (my ($env_key, $env_val) = each %ENV)
{
if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
}
} else {
# Non-docker run
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
+ $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
$command .= $stdbuf;
$command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
}
sub reapchildren
{
my $children_reaped = 0;
- while ((my $pid = waitpid (-1, WNOHANG)) > 0)
+ my @successful_task_uuids = ();
+
+ while((my $pid = waitpid (-1, WNOHANG)) > 0)
{
my $childstatus = $?;
. $slot[$proc{$pid}->{slot}]->{cpu});
my $jobstepidx = $proc{$pid}->{jobstepidx};
- if (!WIFEXITED($childstatus))
- {
- # child did not exit (may be temporarily stopped)
- Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
- next;
- }
-
$children_reaped++;
my $elapsed = time - $proc{$pid}->{time};
my $Jobstep = $jobstep[$jobstepidx];
push @jobstep_todo, $jobstepidx;
$Job->{'tasks_summary'}->{'failed'}++;
}
- else
+ else # task_success
{
+ push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
++$thisround_succeeded;
$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
$slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
push @freeslot, $proc{$pid}->{slot};
delete $proc{$pid};
- if ($task_success) {
- # Load new tasks
- my $newtask_list = [];
- my $newtask_results;
- do {
- $newtask_results = api_call(
- "job_tasks/list",
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
- push(@$newtask_list, @{$newtask_results->{items}});
- } while (@{$newtask_results->{items}});
- foreach my $arvados_task (@$newtask_list) {
- my $jobstep = {
- 'level' => $arvados_task->{'sequence'},
- 'failures' => 0,
- 'arvados_task' => $arvados_task
- };
- push @jobstep, $jobstep;
- push @jobstep_todo, $#jobstep;
- }
- }
$progress_is_dirty = 1;
}
+ if (scalar(@successful_task_uuids) > 0)
+ {
+ Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
+ # Load new tasks
+ my $newtask_list = [];
+ my $newtask_results;
+ do {
+ $newtask_results = api_call(
+ "job_tasks/list",
+ 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
+ push(@$newtask_list, @{$newtask_results->{items}});
+ } while (@{$newtask_results->{items}});
+ Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
+ foreach my $arvados_task (@$newtask_list) {
+ my $jobstep = {
+ 'level' => $arvados_task->{'sequence'},
+ 'failures' => 0,
+ 'arvados_task' => $arvados_task
+ };
+ push @jobstep, $jobstep;
+ push @jobstep_todo, $#jobstep;
+ }
+ }
+
return $children_reaped;
}
}
}
+sub check_sinfo
+{
+ # If a node fails in a multi-node "srun" call during job setup, the call
+ # may hang instead of exiting with a nonzero code. This function checks
+ # "sinfo" for the health of the nodes that were allocated and ensures that
+ # they are all still in the "alloc" state. If a node that is allocated to
+ # this job is not in "alloc" state, then set please_freeze.
+ #
+ # This is only called from srun_sync() for node configuration. If a
+ # node fails doing actual work, there are other recovery mechanisms.
+
+ # Do not call `sinfo` more than once every 15 seconds.
+ return if $sinfo_checked > time - 15;
+ $sinfo_checked = time;
+
+ # The output format "%t" means output node states.
+ my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
+ if ($? != 0)
+ {
+ Log(undef, "warning: sinfo exit status $? ($!)");
+ return;
+ }
+ chop @sinfo;
+
+ foreach (@sinfo)
+ {
+ if ($_ != "alloc" && $_ != "alloc*") {
+ $main::please_freeze = 1;
+ }
+ }
+}
sub release_allocation
{
sub preprocess_stderr
{
my $jobstepidx = shift;
+ # slotindex is only defined for children running Arvados job tasks.
+ # Be prepared to handle the undef case (for setup srun calls, etc.).
+ my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
my $line = $1;
substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
Log ($jobstepidx, "stderr $line");
if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
- # whoa.
+ # If the allocation is revoked, we can't possibly continue, so mark all
+ # nodes as failed. This will cause the overall exit code to be
+ # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
+ # this job.
$main::please_freeze = 1;
- }
- elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
- # Skip the following tempfail checks if this srun proc isn't
- # attached to a particular worker slot.
+ foreach my $st (@slot) {
+ $st->{node}->{fail_count}++;
+ }
}
elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
- my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
- $slot[$job_slot_index]->{node}->{fail_count}++;
$jobstep[$jobstepidx]->{tempfail} = 1;
- ban_node_by_slot($job_slot_index);
+ if (defined($job_slot_index)) {
+ $slot[$job_slot_index]->{node}->{fail_count}++;
+ ban_node_by_slot($job_slot_index);
+ }
}
elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
$jobstep[$jobstepidx]->{tempfail} = 1;
- ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
+ ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
}
- elsif ($line =~ /arvados\.errors\.Keep/) {
+ elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
$jobstep[$jobstepidx]->{tempfail} = 1;
}
}
return $s;
}
-
sub srun_sync
{
my $srunargs = shift;
if (!$busy || ($latest_refresh + 2 < scalar time)) {
check_refresh_wanted();
check_squeue();
+ check_sinfo();
}
if (!$busy) {
select(undef, undef, undef, 0.1);
delete $reader{$jobstepidx};
my $j = pop @jobstep;
+ # If the srun showed signs of tempfail, ensure the caller treats that as a
+ # failure case.
+ if ($main::please_freeze || $j->{tempfail}) {
+ $exited ||= 255;
+ }
return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
}