X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1ea3251891770887654a3d9ae68f5f1cf7f1d689..c3074f48c15ae7d1f4bc30939959c7243708cb37:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 4cd4182883..92fe886412 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -356,8 +356,12 @@ if (!defined $no_clear_tmp) { my $cleanpid = fork(); if ($cleanpid == 0) { + # Find FUSE mounts that look like Keep mounts (the mount path has the + # word "keep") and unmount them. Then clean up work directories. + # TODO: When #5036 is done and widely deployed, we can get rid of the + # regular expression and just unmount everything with type fuse.keep. srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']); + ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']); exit (1); } while (1) @@ -380,7 +384,7 @@ if ($docker_locator = $Job->{docker_image_locator}) { } $docker_stream =~ s/^\.//; my $docker_install_script = qq{ -if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then +if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load fi }; @@ -666,6 +670,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) my $childslotname = join (".", $slot[$childslot]->{node}->{name}, $slot[$childslot]->{cpu}); + + if ($docker_hash) { + $Jobstep->{cidfile} = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid"; + } + my $childpid = fork(); if ($childpid == 0) { @@ -717,9 +726,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec "; if ($docker_hash) { - my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid"; - $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; - $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy "; + $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$Jobstep->{cidfile} -poll=10000 "; + $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$Jobstep->{cidfile} --sig-proxy "; # Dynamically configure the container to use the host system as its # DNS server. Get the host's global addresses from the ip command, @@ -827,7 +835,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) || (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { - last THISROUND if $main::please_freeze; + last THISROUND if $main::please_freeze || defined($main::success); if ($main::please_info) { $main::please_info = 0; @@ -937,7 +945,7 @@ if (!$collated_output) { Log (undef, "Failed to write output collection"); } else { - Log(undef, "output hash " . $collated_output); + Log(undef, "job output $collated_output"); $Job->update_attributes('output' => $collated_output); } @@ -1033,7 +1041,6 @@ sub reapchildren if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { # Give up on this task, and the whole job $main::success = 0; - $main::please_freeze = 1; } # Put this task back on the todo queue push @jobstep_todo, $jobstepid; @@ -1052,7 +1059,9 @@ sub reapchildren $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); $Jobstep->{'arvados_task'}->save; process_stderr ($jobstepid, $task_success); - Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output}); + Log ($jobstepid, sprintf("task output (%d bytes): %s", + length($Jobstep->{'arvados_task'}->{output}), + $Jobstep->{'arvados_task'}->{output})); close $reader{$jobstepid}; delete $reader{$jobstepid}; @@ -1060,6 +1069,11 @@ sub reapchildren push @freeslot, $proc{$pid}->{slot}; delete $proc{$pid}; + if (defined($Jobstep->{cidfile})) { + unlink $Jobstep->{cidfile}; + delete $Jobstep->{cidfile}; + } + if ($task_success) { # Load new tasks my $newtask_list = []; @@ -1248,16 +1262,19 @@ sub process_stderr sub fetch_block { my $hash = shift; - my ($keep, $child_out, $output_block); - - my $cmd = "arv-get \Q$hash\E"; - open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!"; - $output_block = ''; + my $keep; + if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) { + Log(undef, "fetch_block run error from arv-get $hash: $!"); + return undef; + } + my $output_block = ""; while (1) { my $buf; my $bytes = sysread($keep, $buf, 1024 * 1024); if (!defined $bytes) { - die "reading from arv-get: $!"; + Log(undef, "fetch_block read error from arv-get: $!"); + $output_block = undef; + last; } elsif ($bytes == 0) { # sysread returns 0 at the end of the pipe. last; @@ -1267,60 +1284,80 @@ sub fetch_block } } close $keep; + if ($?) { + Log(undef, "fetch_block arv-get exited " . exit_status_s($?)); + $output_block = undef; + } return $output_block; } -# create_output_collections generates a new collection containing the -# output of each successfully completed task, and returns the -# portable_data_hash for the new collection. -# +# Create a collection by concatenating the output of all tasks (each +# task's output is either a manifest fragment, a locator for a +# manifest fragment stored in Keep, or nothing at all). Return the +# portable_data_hash of the new collection. sub create_output_collection { Log (undef, "collate"); my ($child_out, $child_in); - my $pid = open2($child_out, $child_in, 'python', '-c', - 'import arvados; ' . - 'import sys; ' . - 'print arvados.api()' . - '.collections()' . - '.create(body={"manifest_text":sys.stdin.read()})' . - '.execute()["portable_data_hash"]' - ); - + my $pid = open2($child_out, $child_in, 'python', '-c', q{ +import arvados +import sys +print (arvados.api("v1").collections(). + create(body={"manifest_text": sys.stdin.read()}). + execute(num_retries=int(sys.argv[1]))["portable_data_hash"]) +}, retry_count()); + + my $task_idx = -1; + my $manifest_size = 0; for (@jobstep) { - next if (!exists $_->{'arvados_task'}->{'output'} || - !$_->{'arvados_task'}->{'success'}); + ++$task_idx; my $output = $_->{'arvados_task'}->{output}; - if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/) - { - print $child_in $output; - } - elsif (defined (my $outblock = fetch_block ($output))) - { - print $child_in $outblock; + next if (!defined($output)); + my $next_write; + if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) { + $next_write = fetch_block($output); + } else { + $next_write = $output; } - else - { - Log (undef, "XXX fetch_block($output) failed XXX"); + if (defined($next_write)) { + if (!defined(syswrite($child_in, $next_write))) { + # There's been an error writing. Stop the loop. + # We'll log details about the exit code later. + last; + } else { + $manifest_size += length($next_write); + } + } else { + my $uuid = $_->{'arvados_task'}->{'uuid'}; + Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)"); $main::success = 0; } } - $child_in->close; + close($child_in); + Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens"); my $joboutput; my $s = IO::Select->new($child_out); if ($s->can_read(120)) { - sysread($child_out, $joboutput, 64 * 1024 * 1024); - chomp($joboutput); - # TODO: Ensure exit status == 0. + sysread($child_out, $joboutput, 1024 * 1024); + waitpid($pid, 0); + if ($?) { + Log(undef, "output collection creation exited " . exit_status_s($?)); + $joboutput = undef; + } else { + chomp($joboutput); + } } else { Log (undef, "timed out while creating output collection"); + foreach my $signal (2, 2, 2, 15, 15, 9) { + kill($signal, $pid); + last if waitpid($pid, WNOHANG) == -1; + sleep(1); + } } - # TODO: kill $pid instead of waiting, now that we've decided to - # ignore further output. - waitpid($pid, 0); + close($child_out); return $joboutput; } @@ -1394,8 +1431,11 @@ sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, - 'arv-put', '--portable-data-hash', + 'arv-put', + '--portable-data-hash', + '--project-uuid', $Job->{owner_uuid}, '--retries', '3', + '--name', $logfilename, '--filename', $logfilename, '-'); } @@ -1846,25 +1886,24 @@ if (@ARGV) { if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) { shell_or_die("virtualenv", "--quiet", "--system-site-packages", "--python=python2.7", $venv_dir); - shell_or_die("$venv_dir/bin/pip", "--quiet", "install", $python_src); + shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src); $venv_built = 1; $Log->("Built Python SDK virtualenv"); } - my $pkgs; + my $pip_bin = "pip"; if ($venv_built) { $Log->("Running in Python SDK virtualenv"); - $pkgs = `((\Q$venv_dir/bin/pip\E freeze 2>/dev/null | grep arvados) || dpkg --show '*arvados*')`; + $pip_bin = "$venv_dir/bin/pip"; my $orig_argv = join(" ", map { quotemeta($_); } @ARGV); @ARGV = ("/bin/sh", "-ec", ". \Q$venv_dir/bin/activate\E; exec $orig_argv"); } elsif (-d $python_src) { - $Log->("Warning: virtualenv not found inside Docker container default " + + $Log->("Warning: virtualenv not found inside Docker container default " . "\$PATH. Can't install Python SDK."); - } else { - $pkgs = `((pip freeze 2>/dev/null | grep arvados) || dpkg --show '*arvados*')`; } + my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`; if ($pkgs) { $Log->("Using Arvados SDK:"); foreach my $line (split /\n/, $pkgs) { @@ -1904,10 +1943,15 @@ if (readlink ("$destdir.commit") eq $commit && -d $destdir) { unlink "$destdir.commit"; mkdir $destdir; -open TARX, "|-", "tar", "-xC", $destdir; -{ - local $/ = undef; - print TARX ; + +if (!open(TARX, "|-", "tar", "-xC", $destdir)) { + die "Error launching 'tar -xC $destdir': $!"; +} +# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we +# get SIGPIPE. We must feed it data incrementally. +my $tar_input; +while (read(DATA, $tar_input, 65536)) { + print TARX $tar_input; } if(!close(TARX)) { die "'tar -xC $destdir' exited $?: $!";