use Fcntl ':flock';
use File::Path qw( make_path remove_tree );
+use constant TASK_TEMPFAIL => 111;
use constant EX_TEMPFAIL => 75;
$ENV{"TMPDIR"} ||= "/tmp";
my $cleanpid = fork();
if ($cleanpid == 0)
{
+ # Find FUSE mounts that look like Keep mounts (the mount path has the
+ # word "keep") and unmount them. Then clean up work directories.
+ # TODO: When #5036 is done and widely deployed, we can get rid of the
+ # regular expression and just unmount everything with type fuse.keep.
srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
+ ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
exit (1);
}
while (1)
}
$docker_stream =~ s/^\.//;
my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
fi
};
my $childslotname = join (".",
$slot[$childslot]->{node}->{name},
$slot[$childslot]->{cpu});
+
my $childpid = fork();
if ($childpid == 0)
{
$command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_hash)
{
- my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
+ my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
$command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
$command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
||
(@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
- last THISROUND if $main::please_freeze;
+ last THISROUND if $main::please_freeze || defined($main::success);
if ($main::please_info)
{
$main::please_info = 0;
Log (undef, "Failed to write output collection");
}
else {
- Log(undef, "output hash " . $collated_output);
+ Log(undef, "job output $collated_output");
$Job->update_attributes('output' => $collated_output);
}
{
my $temporary_fail;
$temporary_fail ||= $Jobstep->{node_fail};
- $temporary_fail ||= ($exitvalue == 111);
+ $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
++$thisround_failed;
++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
# Give up on this task, and the whole job
$main::success = 0;
- $main::please_freeze = 1;
}
# Put this task back on the todo queue
push @jobstep_todo, $jobstepid;
$Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
$Jobstep->{'arvados_task'}->save;
process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
+ Log ($jobstepid, sprintf("task output (%d bytes): %s",
+ length($Jobstep->{'arvados_task'}->{output}),
+ $Jobstep->{'arvados_task'}->{output}));
close $reader{$jobstepid};
delete $reader{$jobstepid};
# whoa.
$main::please_freeze = 1;
}
- elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
+ elsif ($line =~ /srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure)/) {
$jobstep[$job]->{node_fail} = 1;
ban_node_by_slot($jobstep[$job]->{slotindex});
}
sub fetch_block
{
my $hash = shift;
- my ($keep, $child_out, $output_block);
-
- my $cmd = "arv-get \Q$hash\E";
- open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
- $output_block = '';
+ my $keep;
+ if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
+ Log(undef, "fetch_block run error from arv-get $hash: $!");
+ return undef;
+ }
+ my $output_block = "";
while (1) {
my $buf;
my $bytes = sysread($keep, $buf, 1024 * 1024);
if (!defined $bytes) {
- die "reading from arv-get: $!";
+ Log(undef, "fetch_block read error from arv-get: $!");
+ $output_block = undef;
+ last;
} elsif ($bytes == 0) {
# sysread returns 0 at the end of the pipe.
last;
}
}
close $keep;
+ if ($?) {
+ Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
+ $output_block = undef;
+ }
return $output_block;
}
-# create_output_collections generates a new collection containing the
-# output of each successfully completed task, and returns the
-# portable_data_hash for the new collection.
-#
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
sub create_output_collection
{
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'python', '-c',
- 'import arvados; ' .
- 'import sys; ' .
- 'print arvados.api()' .
- '.collections()' .
- '.create(body={"manifest_text":sys.stdin.read()})' .
- '.execute()["portable_data_hash"]'
- );
-
+ my $pid = open2($child_out, $child_in, 'python', '-c', q{
+import arvados
+import sys
+print (arvados.api("v1").collections().
+ create(body={"manifest_text": sys.stdin.read()}).
+ execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
+}, retry_count());
+
+ my $task_idx = -1;
+ my $manifest_size = 0;
for (@jobstep)
{
- next if (!exists $_->{'arvados_task'}->{'output'} ||
- !$_->{'arvados_task'}->{'success'});
+ ++$task_idx;
my $output = $_->{'arvados_task'}->{output};
- if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
- {
- print $child_in $output;
- }
- elsif (defined (my $outblock = fetch_block ($output)))
- {
- print $child_in $outblock;
+ next if (!defined($output));
+ my $next_write;
+ if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
+ $next_write = fetch_block($output);
+ } else {
+ $next_write = $output;
}
- else
- {
- Log (undef, "XXX fetch_block($output) failed XXX");
+ if (defined($next_write)) {
+ if (!defined(syswrite($child_in, $next_write))) {
+ # There's been an error writing. Stop the loop.
+ # We'll log details about the exit code later.
+ last;
+ } else {
+ $manifest_size += length($next_write);
+ }
+ } else {
+ my $uuid = $_->{'arvados_task'}->{'uuid'};
+ Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
$main::success = 0;
}
}
- $child_in->close;
+ close($child_in);
+ Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
my $joboutput;
my $s = IO::Select->new($child_out);
if ($s->can_read(120)) {
- sysread($child_out, $joboutput, 64 * 1024 * 1024);
- chomp($joboutput);
- # TODO: Ensure exit status == 0.
+ sysread($child_out, $joboutput, 1024 * 1024);
+ waitpid($pid, 0);
+ if ($?) {
+ Log(undef, "output collection creation exited " . exit_status_s($?));
+ $joboutput = undef;
+ } else {
+ chomp($joboutput);
+ }
} else {
Log (undef, "timed out while creating output collection");
+ foreach my $signal (2, 2, 2, 15, 15, 9) {
+ kill($signal, $pid);
+ last if waitpid($pid, WNOHANG) == -1;
+ sleep(1);
+ }
}
- # TODO: kill $pid instead of waiting, now that we've decided to
- # ignore further output.
- waitpid($pid, 0);
+ close($child_out);
return $joboutput;
}
{
my $logfilename = shift;
$log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
- 'arv-put', '--portable-data-hash',
+ 'arv-put',
+ '--portable-data-hash',
+ '--project-uuid', $Job->{owner_uuid},
'--retries', '3',
+ '--name', $logfilename,
'--filename', $logfilename,
'-');
}
use File::Path qw( make_path remove_tree );
use POSIX qw(getcwd);
+use constant TASK_TEMPFAIL => 111;
+
# Map SDK subdirectories to the path environments they belong to.
my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
my $venv_dir = "$job_work/.arvados.venv";
my $venv_built = -e "$venv_dir/bin/activate";
if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
- shell_or_die("virtualenv", "--quiet", "--system-site-packages",
+ shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
"--python=python2.7", $venv_dir);
- shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+ shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
$venv_built = 1;
$Log->("Built Python SDK virtualenv");
}
unlink "$destdir.commit";
mkdir $destdir;
-open TARX, "|-", "tar", "-xC", $destdir;
-{
- local $/ = undef;
- print TARX <DATA>;
+
+if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
+ die "Error launching 'tar -xC $destdir': $!";
+}
+# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+# get SIGPIPE. We must feed it data incrementally.
+my $tar_input;
+while (read(DATA, $tar_input, 65536)) {
+ print TARX $tar_input;
}
if(!close(TARX)) {
die "'tar -xC $destdir' exited $?: $!";
}
if (-e "$destdir/crunch_scripts/install") {
- shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
+ shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
# Old version
- shell_or_die ("./tests/autotests.sh", $install_dir);
+ shell_or_die (undef, "./tests/autotests.sh", $install_dir);
} elsif (-e "./install.sh") {
- shell_or_die ("./install.sh", $install_dir);
+ shell_or_die (undef, "./install.sh", $install_dir);
}
if ($commit) {
sub shell_or_die
{
+ my $exitcode = shift;
+
if ($ENV{"DEBUG"}) {
print STDERR "@_\n";
}
if (system (@_) != 0) {
my $err = $!;
- my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
+ my $code = $?;
+ my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
open STDERR, ">&STDERR_ORIG";
system ("cat $destdir.log >&2");
- die "@_ failed ($err): $exitstatus";
+ warn "@_ failed ($err): $exitstatus";
+ if (defined($exitcode)) {
+ exit $exitcode;
+ }
+ else {
+ exit (($code >> 8) || 1);
+ }
}
}