use POSIX qw(strftime);
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Arvados;
+use Cwd qw(realpath);
use Data::Dumper;
use Digest::MD5 qw(md5_hex);
use Getopt::Long;
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
+my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
+if ($? == 0) {
+ $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
+ chomp($gem_versions);
+ chop($gem_versions); # Closing parentheses
+} else {
+ $gem_versions = "";
+}
+Log(undef,
+ "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
Log (undef, "check slurm allocation");
my @slot;
my $cleanpid = fork();
if ($cleanpid == 0)
{
+ # Find FUSE mounts that look like Keep mounts (the mount path has the
+ # word "keep") and unmount them. Then clean up work directories.
+ # TODO: When #5036 is done and widely deployed, we can get rid of the
+ # regular expression and just unmount everything with type fuse.keep.
srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
+ ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
exit (1);
}
while (1)
return $output_block;
}
-# create_output_collections generates a new collection containing the
-# output of each successfully completed task, and returns the
-# portable_data_hash for the new collection.
-#
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
sub create_output_collection
{
Log (undef, "collate");
'.execute()["portable_data_hash"]'
);
+ my $task_idx = -1;
for (@jobstep)
{
- next if (!exists $_->{'arvados_task'}->{'output'} ||
- !$_->{'arvados_task'}->{'success'});
+ ++$task_idx;
+ next unless exists $_->{'arvados_task'}->{'output'};
my $output = $_->{'arvados_task'}->{output};
if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
{
}
else
{
- Log (undef, "XXX fetch_block($output) failed XXX");
+ my $uuid = $_->{'arvados_task'}->{'uuid'};
+ Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
$main::success = 0;
}
}
if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
shell_or_die("virtualenv", "--quiet", "--system-site-packages",
"--python=python2.7", $venv_dir);
- shell_or_die("$venv_dir/bin/pip", "--quiet", "install", $python_src);
+ shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
$venv_built = 1;
$Log->("Built Python SDK virtualenv");
}
+ my $pip_bin = "pip";
if ($venv_built) {
$Log->("Running in Python SDK virtualenv");
+ $pip_bin = "$venv_dir/bin/pip";
my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
@ARGV = ("/bin/sh", "-ec",
". \Q$venv_dir/bin/activate\E; exec $orig_argv");
} elsif (-d $python_src) {
- $Log->("Warning: virtualenv not found inside Docker container default " +
+ $Log->("Warning: virtualenv not found inside Docker container default " .
"\$PATH. Can't install Python SDK.");
}
+ my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
+ if ($pkgs) {
+ $Log->("Using Arvados SDK:");
+ foreach my $line (split /\n/, $pkgs) {
+ $Log->($line);
+ }
+ } else {
+ $Log->("Arvados SDK packages not found");
+ }
+
while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
my $sdk_path = "$install_dir/$sdk_dir";
if (-d $sdk_path) {
unlink "$destdir.commit";
mkdir $destdir;
-open TARX, "|-", "tar", "-xC", $destdir;
-{
- local $/ = undef;
- print TARX <DATA>;
+
+if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
+ die "Error launching 'tar -xC $destdir': $!";
+}
+# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+# get SIGPIPE. We must feed it data incrementally.
+my $tar_input;
+while (read(DATA, $tar_input, 65536)) {
+ print TARX $tar_input;
}
if(!close(TARX)) {
die "'tar -xC $destdir' exited $?: $!";