Merge branch '3785-job-log-collection-owner' closes #3785
[arvados.git] / sdk / cli / bin / crunch-job
index 3539a57a039246c35b1e92b1f8f11b51a3c0f28c..d69aee6c7b084881f9b4f8d04123f034a5cdb2f3 100755 (executable)
@@ -356,8 +356,12 @@ if (!defined $no_clear_tmp) {
   my $cleanpid = fork();
   if ($cleanpid == 0)
   {
+    # Find FUSE mounts that look like Keep mounts (the mount path has the
+    # word "keep") and unmount them.  Then clean up work directories.
+    # TODO: When #5036 is done and widely deployed, we can get rid of the
+    # regular expression and just unmount everything with type fuse.keep.
     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
+          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
     exit (1);
   }
   while (1)
@@ -1270,10 +1274,10 @@ sub fetch_block
   return $output_block;
 }
 
-# create_output_collections generates a new collection containing the
-# output of each successfully completed task, and returns the
-# portable_data_hash for the new collection.
-#
+# Create a collection by concatenating the output of all tasks (each
+# task's output is either a manifest fragment, a locator for a
+# manifest fragment stored in Keep, or nothing at all). Return the
+# portable_data_hash of the new collection.
 sub create_output_collection
 {
   Log (undef, "collate");
@@ -1288,10 +1292,11 @@ sub create_output_collection
                   '.execute()["portable_data_hash"]'
       );
 
+  my $task_idx = -1;
   for (@jobstep)
   {
-    next if (!exists $_->{'arvados_task'}->{'output'} ||
-             !$_->{'arvados_task'}->{'success'});
+    ++$task_idx;
+    next unless exists $_->{'arvados_task'}->{'output'};
     my $output = $_->{'arvados_task'}->{output};
     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
     {
@@ -1303,7 +1308,8 @@ sub create_output_collection
     }
     else
     {
-      Log (undef, "XXX fetch_block($output) failed XXX");
+      my $uuid = $_->{'arvados_task'}->{'uuid'};
+      Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
       $main::success = 0;
     }
   }
@@ -1394,8 +1400,11 @@ sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
-                        'arv-put', '--portable-data-hash',
+                        'arv-put',
+                        '--portable-data-hash',
+                        '--project-uuid', $Job->{owner_uuid},
                         '--retries', '3',
+                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
 }
@@ -1846,25 +1855,24 @@ if (@ARGV) {
   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
     shell_or_die("virtualenv", "--quiet", "--system-site-packages",
                  "--python=python2.7", $venv_dir);
-    shell_or_die("$venv_dir/bin/pip", "--quiet", "install", $python_src);
+    shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
     $venv_built = 1;
     $Log->("Built Python SDK virtualenv");
   }
 
-  my $pkgs;
+  my $pip_bin = "pip";
   if ($venv_built) {
     $Log->("Running in Python SDK virtualenv");
-    $pkgs = `(\Q$venv_dir/bin/pip\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
+    $pip_bin = "$venv_dir/bin/pip";
     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
     @ARGV = ("/bin/sh", "-ec",
              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
   } elsif (-d $python_src) {
-    $Log->("Warning: virtualenv not found inside Docker container default " +
+    $Log->("Warning: virtualenv not found inside Docker container default " .
            "\$PATH. Can't install Python SDK.");
-  } else {
-    $pkgs = `(pip freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
   }
 
+  my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
   if ($pkgs) {
     $Log->("Using Arvados SDK:");
     foreach my $line (split /\n/, $pkgs) {
@@ -1904,10 +1912,15 @@ if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
 
 unlink "$destdir.commit";
 mkdir $destdir;
-open TARX, "|-", "tar", "-xC", $destdir;
-{
-  local $/ = undef;
-  print TARX <DATA>;
+
+if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
+  die "Error launching 'tar -xC $destdir': $!";
+}
+# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+# get SIGPIPE.  We must feed it data incrementally.
+my $tar_input;
+while (read(DATA, $tar_input, 65536)) {
+  print TARX $tar_input;
 }
 if(!close(TARX)) {
   die "'tar -xC $destdir' exited $?: $!";