X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/16384e6d47c96c21e9159ff70c809cac4e7d57da..18ddae75beb91b0792ad34a9b4996316715e67b7:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 820d142e26..d69aee6c7b 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -356,8 +356,12 @@ if (!defined $no_clear_tmp) { my $cleanpid = fork(); if ($cleanpid == 0) { + # Find FUSE mounts that look like Keep mounts (the mount path has the + # word "keep") and unmount them. Then clean up work directories. + # TODO: When #5036 is done and widely deployed, we can get rid of the + # regular expression and just unmount everything with type fuse.keep. srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']); + ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']); exit (1); } while (1) @@ -1270,10 +1274,10 @@ sub fetch_block return $output_block; } -# create_output_collections generates a new collection containing the -# output of each successfully completed task, and returns the -# portable_data_hash for the new collection. -# +# Create a collection by concatenating the output of all tasks (each +# task's output is either a manifest fragment, a locator for a +# manifest fragment stored in Keep, or nothing at all). Return the +# portable_data_hash of the new collection. sub create_output_collection { Log (undef, "collate"); @@ -1288,10 +1292,11 @@ sub create_output_collection '.execute()["portable_data_hash"]' ); + my $task_idx = -1; for (@jobstep) { - next if (!exists $_->{'arvados_task'}->{'output'} || - !$_->{'arvados_task'}->{'success'}); + ++$task_idx; + next unless exists $_->{'arvados_task'}->{'output'}; my $output = $_->{'arvados_task'}->{output}; if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/) { @@ -1303,7 +1308,8 @@ sub create_output_collection } else { - Log (undef, "XXX fetch_block($output) failed XXX"); + my $uuid = $_->{'arvados_task'}->{'uuid'}; + Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)"); $main::success = 0; } } @@ -1394,8 +1400,11 @@ sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, - 'arv-put', '--portable-data-hash', + 'arv-put', + '--portable-data-hash', + '--project-uuid', $Job->{owner_uuid}, '--retries', '3', + '--name', $logfilename, '--filename', $logfilename, '-'); } @@ -1846,21 +1855,33 @@ if (@ARGV) { if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) { shell_or_die("virtualenv", "--quiet", "--system-site-packages", "--python=python2.7", $venv_dir); - shell_or_die("$venv_dir/bin/pip", "--quiet", "install", $python_src); + shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src); $venv_built = 1; $Log->("Built Python SDK virtualenv"); } + my $pip_bin = "pip"; if ($venv_built) { $Log->("Running in Python SDK virtualenv"); + $pip_bin = "$venv_dir/bin/pip"; my $orig_argv = join(" ", map { quotemeta($_); } @ARGV); @ARGV = ("/bin/sh", "-ec", ". \Q$venv_dir/bin/activate\E; exec $orig_argv"); } elsif (-d $python_src) { - $Log->("Warning: virtualenv not found inside Docker container default " + + $Log->("Warning: virtualenv not found inside Docker container default " . "\$PATH. Can't install Python SDK."); } + my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`; + if ($pkgs) { + $Log->("Using Arvados SDK:"); + foreach my $line (split /\n/, $pkgs) { + $Log->($line); + } + } else { + $Log->("Arvados SDK packages not found"); + } + while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) { my $sdk_path = "$install_dir/$sdk_dir"; if (-d $sdk_path) { @@ -1891,10 +1912,15 @@ if (readlink ("$destdir.commit") eq $commit && -d $destdir) { unlink "$destdir.commit"; mkdir $destdir; -open TARX, "|-", "tar", "-xC", $destdir; -{ - local $/ = undef; - print TARX ; + +if (!open(TARX, "|-", "tar", "-xC", $destdir)) { + die "Error launching 'tar -xC $destdir': $!"; +} +# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we +# get SIGPIPE. We must feed it data incrementally. +my $tar_input; +while (read(DATA, $tar_input, 65536)) { + print TARX $tar_input; } if(!close(TARX)) { die "'tar -xC $destdir' exited $?: $!";