#!/usr/bin/env perl
# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
=head1 NAME
my $cgroup_root = "/sys/fs/cgroup";
my $docker_bin = "docker.io";
my $docker_run_args = "";
+my $srun_sync_timeout = 15*60;
GetOptions('force-unlock' => \$force_unlock,
'git-dir=s' => \$git_dir,
'job=s' => \$jobspec,
'cgroup-root=s' => \$cgroup_root,
'docker-bin=s' => \$docker_bin,
'docker-run-args=s' => \$docker_run_args,
+ 'srun-sync-timeout=i' => \$srun_sync_timeout,
);
if (defined $job_api_token) {
$cmd = [$docker_bin, 'ps', '-q'];
}
Log(undef, "Sanity check is `@$cmd`");
-my ($exited, $stdout, $stderr) = srun_sync(
+my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
$cmd,
{label => "sanity check"});
# Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
# up work directories crunch_tmp/work, crunch_tmp/opt,
# crunch_tmp/src*.
- #
- # TODO: When #5036 is done and widely deployed, we can limit mount's
- # -t option to simply fuse.keep.
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+ ['bash', '-ec', q{
+arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
+rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
+ }],
{label => "clean work dirs"});
if ($exited != 0) {
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
}
}
Log (undef, "docker image hash is $docker_hash");
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
-if $docker_bin images -q --no-trunc --all | grep -xF \Q$docker_hash\E >/dev/null; then
- exit 0
+loaded() {
+ id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
+ echo "image ID is \$id"
+ [[ \${id} = \Q$docker_hash\E ]]
+}
+if loaded >&2 2>/dev/null; then
+ echo >&2 "image is already present"
+ exit 0
fi
-declare -a exit_codes=("\${PIPESTATUS[@]}")
-if [ 0 != "\${exit_codes[0]}" ]; then
- exit "\${exit_codes[0]}" # `docker images` failed
-elif [ 1 != "\${exit_codes[1]}" ]; then
- exit "\${exit_codes[1]}" # `grep` encountered an error
-else
- # Everything worked fine, but grep didn't find the image on this host.
- arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+echo >&2 "docker image is not present; loading"
+arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+if ! loaded >&2; then
+ echo >&2 "`docker load` exited 0, but image is not found (!)"
+ exit 1
fi
+echo >&2 "image loaded successfully"
};
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodelist=" . join(',', @node)],
["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
{label => "load docker image"});
if ($exited != 0)
{
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
}
# Determine whether this version of Docker supports memory+swap limits.
- ($exited, $stdout, $stderr) = srun_sync(
+ ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=1"],
[$docker_bin, 'run', '--help'],
{label => "check --memory-swap feature"});
+ if ($tempfail) {
+ exit_retry_unlocked();
+ }
$docker_limitmem = ($stdout =~ /--memory-swap/);
# Find a non-root Docker user to use.
$label = "check whether user '$try_user' is UID 0";
$try_user_arg = "--user=$try_user";
}
- my ($exited, $stdout, $stderr) = srun_sync(
+ my ($exited, $stdout, $stderr, $tempfail) = srun_sync(
["srun", "--nodes=1"],
["/bin/sh", "-ec",
"$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
Log(undef, "Container will run with $dockeruserarg");
}
last;
+ } elsif ($tempfail) {
+ exit_retry_unlocked();
}
}
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
$ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
- my ($stdout, $stderr);
- ($exited, $stdout, $stderr) = srun_sync(
+ my ($stdout, $stderr, $tempfail);
+ ($exited, $stdout, $stderr, $tempfail) = srun_sync(
\@srunargs, \@execargs,
{label => "run install script on all workers"},
- $build_script . $git_archive);
+ $build_script . $git_archive);
+ if ($tempfail) {
+ exit_retry_unlocked();
+ }
my $stderr_anything_from_script = 0;
for my $line (split(/\n/, $stderr)) {
{
Log (undef, "knob " . $_);
}
+my $resp = api_call(
+ 'nodes/list',
+ 'filters' => [['hostname', 'in', \@node]],
+ 'order' => 'hostname',
+ 'limit' => scalar(@node),
+ );
+for my $n (@{$resp->{items}}) {
+ Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
+}
close($_);
}
fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
- open(STDOUT,">&writer");
- open(STDERR,">&writer");
+ open(STDOUT,">&writer") or croak ($!);
+ open(STDERR,">&writer") or croak ($!);
undef $dbh;
undef $sth;
delete $Jobstep->{tempfail};
$Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
splice @jobstep_todo, $todo_ptr, 1;
--$todo_ptr;
} elsif ($working_slot_count < 1) {
save_output_collection();
save_meta();
- exit(EX_RETRY_UNLOCKED);
+ exit_retry_unlocked();
} elsif ($thisround_succeeded == 0 &&
($thisround_failed == 0 || $thisround_failed > 4)) {
my $message = "stop because $thisround_failed tasks failed and none succeeded";
my $collated_output = save_output_collection();
Log (undef, "finish");
-save_meta();
+my $final_log = save_meta();
my $final_state;
-if ($collated_output && $main::success) {
+if ($collated_output && $final_log && $main::success) {
$final_state = 'Complete';
} else {
$final_state = 'Failed';
. $slot[$proc{$pid}->{slot}]->{cpu});
my $jobstepidx = $proc{$pid}->{jobstepidx};
+ readfrompipes_after_exit ($jobstepidx);
+
$children_reaped++;
my $elapsed = time - $proc{$pid}->{time};
my $Jobstep = $jobstep[$jobstepidx];
"ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
exit_status_s($childstatus)));
$Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
$task_success = 0;
}
$Jobstep->{exitcode} = $childstatus;
$Jobstep->{finishtime} = time;
$Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
- process_stderr_final ($jobstepidx);
+ retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
Log ($jobstepidx, sprintf("task output (%d bytes): %s",
length($Jobstep->{'arvados_task'}->{output}),
$Jobstep->{'arvados_task'}->{output}));
$st->{node}->{fail_count}++;
}
}
- elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+ elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
$jobstep[$jobstepidx]->{tempfail} = 1;
if (defined($job_slot_index)) {
$slot[$job_slot_index]->{node}->{fail_count}++;
}
-sub process_stderr_final
+# Read whatever is still available on its stderr+stdout pipes after
+# the given child process has exited.
+sub readfrompipes_after_exit
{
my $jobstepidx = shift;
+
+ # The fact that the child has exited allows some convenient
+ # simplifications: (1) all data must have already been written, so
+ # there's no need to wait for more once sysread returns 0; (2) the
+ # total amount of data available is bounded by the pipe buffer size,
+ # so it's safe to read everything into one string.
+ my $buf;
+ while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) {
+ $jobstep[$jobstepidx]->{stderr_at} = time;
+ $jobstep[$jobstepidx]->{stderr} .= $buf;
+ }
+ if ($jobstep[$jobstepidx]->{stdout_r}) {
+ while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) {
+ $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+ }
+ }
preprocess_stderr ($jobstepidx);
map {
import arvados
import sys
print (arvados.api("v1").collections().
- create(body={"manifest_text": sys.stdin.read()}).
+ create(body={"manifest_text": sys.stdin.read(),
+ "owner_uuid": sys.argv[2]}).
execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
-}, retry_count());
+}, retry_count(), $Job->{owner_uuid});
my $task_idx = -1;
my $manifest_size = 0;
$log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
'arv-put',
'--stream',
- '--retries', '3',
+ '--retries', '6',
'--filename', $logfilename,
'-');
$log_pipe_out_buf = "";
close($log_pipe_in);
my $logger_failed = 0;
- my $read_result = log_writer_read_output(120);
+ my $read_result = log_writer_read_output(600);
if ($read_result == -1) {
$logger_failed = -1;
Log (undef, "timed out reading from 'arv-put'");
my ($package, $file, $line) = caller;
my $message = "@_ at $file line $line\n";
Log (undef, $message);
+ release_allocation();
freeze() if @jobstep_todo;
create_output_collection() if @jobstep_todo;
cleanup();
});
Log(undef, "log collection is " . $log_coll->{portable_data_hash});
$Job->update_attributes('log' => $log_coll->{portable_data_hash});
+
+ return $log_coll->{portable_data_hash};
}
my ($stdout_r, $stdout_w);
pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+ my $started_srun = scalar time;
+
my $srunpid = fork();
if ($srunpid == 0)
{
close($stdout_r);
fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
fcntl($stdout_w, F_SETFL, 0) or croak($!);
- open(STDERR, ">&", $stderr_w);
- open(STDOUT, ">&", $stdout_w);
+ open(STDERR, ">&", $stderr_w) or croak ($!);
+ open(STDOUT, ">&", $stdout_w) or croak ($!);
srun ($srunargs, $execargs, $opts, $stdin);
exit (1);
}
if (!$busy) {
select(undef, undef, undef, 0.1);
}
+ if (($started_srun + $srun_sync_timeout) < scalar time) {
+ # Exceeded general timeout for "srun_sync" operations, likely
+ # means something got stuck on the remote node.
+ Log(undef, "srun_sync exceeded timeout, will fail.");
+ $main::please_freeze = 1;
+ }
killem(keys %proc) if $main::please_freeze;
}
my $exited = $?;
- 1 while readfrompipes();
- process_stderr_final ($jobstepidx);
+ readfrompipes_after_exit ($jobstepidx);
Log (undef, "$label: exit ".exit_status_s($exited));
if ($main::please_freeze || $j->{tempfail}) {
$exited ||= 255;
}
- return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+ return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail});
}
}
}
+sub exit_retry_unlocked {
+ Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED);
+ exit(EX_RETRY_UNLOCKED);
+}
+
sub retry_count {
# Calculate the number of times an operation should be retried,
# assuming exponential backoff, and that we're willing to retry as
# that can be retried, the second function will be called with
# the current try count (0-based), next try time, and error message.
my $operation = shift;
- my $retry_callback = shift;
+ my $op_text = shift;
my $retries = retry_count();
+ my $retry_callback = sub {
+ my ($try_count, $next_try_at, $errmsg) = @_;
+ $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+ $errmsg =~ s/\s/ /g;
+ $errmsg =~ s/\s+$//;
+ my $retry_msg;
+ if ($next_try_at < time) {
+ $retry_msg = "Retrying.";
+ } else {
+ my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+ $retry_msg = "Retrying at $next_try_fmt.";
+ }
+ Log(undef, "$op_text failed: $errmsg. $retry_msg");
+ };
foreach my $try_count (0..$retries) {
my $next_try = time + (2 ** $try_count);
my $result = eval { $operation->(@_); };
# This function will call that method, retrying as needed until
# the current retry_count is exhausted, with a log on the first failure.
my $method_name = shift;
- my $log_api_retry = sub {
- my ($try_count, $next_try_at, $errmsg) = @_;
- $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
- $errmsg =~ s/\s/ /g;
- $errmsg =~ s/\s+$//;
- my $retry_msg;
- if ($next_try_at < time) {
- $retry_msg = "Retrying.";
- } else {
- my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
- $retry_msg = "Retrying at $next_try_fmt.";
- }
- Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
- };
my $method = $arv;
foreach my $key (split(/\//, $method_name)) {
$method = $method->{$key};
}
- return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+ return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
}
sub exit_status_s {
# Hide messages from the install script (unless it fails: shell_or_die
# will show $destdir.log in that case).
-open(STDOUT, ">>", "$destdir.log");
-open(STDERR, ">&", STDOUT);
+open(STDOUT, ">>", "$destdir.log") or die ($!);
+open(STDERR, ">&", STDOUT) or die ($!);
if (-e "$destdir/crunch_scripts/install") {
shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
sub can_run {
my $command_name = shift;
- open(my $which, "-|", "which", $command_name);
+ open(my $which, "-|", "which", $command_name) or die ($!);
while (<$which>) { }
close($which);
return ($? == 0);