Merge branch 'master' into 5375-preview-collection-text-files
[arvados.git] / sdk / cli / bin / crunch-job
index c312f5d169fb77b7b853b113ce97195418dd2f56..b74d7af427ce0bc874454d3375082007abd2dbd9 100755 (executable)
@@ -96,6 +96,7 @@ use File::Temp;
 use Fcntl ':flock';
 use File::Path qw( make_path remove_tree );
 
+use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
 
 $ENV{"TMPDIR"} ||= "/tmp";
@@ -361,7 +362,7 @@ if (!defined $no_clear_tmp) {
     # TODO: When #5036 is done and widely deployed, we can get rid of the
     # regular expression and just unmount everything with type fuse.keep.
     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
+          ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
     exit (1);
   }
   while (1)
@@ -384,7 +385,7 @@ if ($docker_locator = $Job->{docker_image_locator}) {
   }
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
 fi
 };
@@ -670,6 +671,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   my $childslotname = join (".",
                            $slot[$childslot]->{node}->{name},
                            $slot[$childslot]->{cpu});
+
   my $childpid = fork();
   if ($childpid == 0)
   {
@@ -721,7 +723,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
-      my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
+      my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
       $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
 
@@ -831,7 +833,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         ||
         (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
   {
-    last THISROUND if $main::please_freeze;
+    last THISROUND if $main::please_freeze || defined($main::success);
     if ($main::please_info)
     {
       $main::please_info = 0;
@@ -941,7 +943,7 @@ if (!$collated_output) {
   Log (undef, "Failed to write output collection");
 }
 else {
-  Log(undef, "output hash " . $collated_output);
+  Log(undef, "job output $collated_output");
   $Job->update_attributes('output' => $collated_output);
 }
 
@@ -1010,7 +1012,7 @@ sub reapchildren
   {
     my $temporary_fail;
     $temporary_fail ||= $Jobstep->{node_fail};
-    $temporary_fail ||= ($exitvalue == 111);
+    $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
 
     ++$thisround_failed;
     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
@@ -1037,7 +1039,6 @@ sub reapchildren
     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
       # Give up on this task, and the whole job
       $main::success = 0;
-      $main::please_freeze = 1;
     }
     # Put this task back on the todo queue
     push @jobstep_todo, $jobstepid;
@@ -1056,7 +1057,9 @@ sub reapchildren
   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
   $Jobstep->{'arvados_task'}->save;
   process_stderr ($jobstepid, $task_success);
-  Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
+  Log ($jobstepid, sprintf("task output (%d bytes): %s",
+                           length($Jobstep->{'arvados_task'}->{output}),
+                           $Jobstep->{'arvados_task'}->{output}));
 
   close $reader{$jobstepid};
   delete $reader{$jobstepid};
@@ -1230,7 +1233,7 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /srun: error: (Node failure on|Unable to create job step/) {
+    elsif ($line =~ /srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure)/) {
       $jobstep[$job]->{node_fail} = 1;
       ban_node_by_slot($jobstep[$job]->{slotindex});
     }
@@ -1252,16 +1255,19 @@ sub process_stderr
 sub fetch_block
 {
   my $hash = shift;
-  my ($keep, $child_out, $output_block);
-
-  my $cmd = "arv-get \Q$hash\E";
-  open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
-  $output_block = '';
+  my $keep;
+  if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) {
+    Log(undef, "fetch_block run error from arv-get $hash: $!");
+    return undef;
+  }
+  my $output_block = "";
   while (1) {
     my $buf;
     my $bytes = sysread($keep, $buf, 1024 * 1024);
     if (!defined $bytes) {
-      die "reading from arv-get: $!";
+      Log(undef, "fetch_block read error from arv-get: $!");
+      $output_block = undef;
+      last;
     } elsif ($bytes == 0) {
       # sysread returns 0 at the end of the pipe.
       last;
@@ -1271,6 +1277,10 @@ sub fetch_block
     }
   }
   close $keep;
+  if ($?) {
+    Log(undef, "fetch_block arv-get exited " . exit_status_s($?));
+    $output_block = undef;
+  }
   return $output_block;
 }
 
@@ -1283,50 +1293,64 @@ sub create_output_collection
   Log (undef, "collate");
 
   my ($child_out, $child_in);
-  my $pid = open2($child_out, $child_in, 'python', '-c',
-                  'import arvados; ' .
-                  'import sys; ' .
-                  'print arvados.api()' .
-                  '.collections()' .
-                  '.create(body={"manifest_text":sys.stdin.read()})' .
-                  '.execute()["portable_data_hash"]'
-      );
+  my $pid = open2($child_out, $child_in, 'python', '-c', q{
+import arvados
+import sys
+print (arvados.api("v1").collections().
+       create(body={"manifest_text": sys.stdin.read()}).
+       execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
+}, retry_count());
 
   my $task_idx = -1;
+  my $manifest_size = 0;
   for (@jobstep)
   {
     ++$task_idx;
-    next unless exists $_->{'arvados_task'}->{'output'};
     my $output = $_->{'arvados_task'}->{output};
-    if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
-    {
-      print $child_in $output;
-    }
-    elsif (defined (my $outblock = fetch_block ($output)))
-    {
-      print $child_in $outblock;
+    next if (!defined($output));
+    my $next_write;
+    if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) {
+      $next_write = fetch_block($output);
+    } else {
+      $next_write = $output;
     }
-    else
-    {
+    if (defined($next_write)) {
+      if (!defined(syswrite($child_in, $next_write))) {
+        # There's been an error writing.  Stop the loop.
+        # We'll log details about the exit code later.
+        last;
+      } else {
+        $manifest_size += length($next_write);
+      }
+    } else {
       my $uuid = $_->{'arvados_task'}->{'uuid'};
       Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)");
       $main::success = 0;
     }
   }
-  $child_in->close;
+  close($child_in);
+  Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens");
 
   my $joboutput;
   my $s = IO::Select->new($child_out);
   if ($s->can_read(120)) {
-    sysread($child_out, $joboutput, 64 * 1024 * 1024);
-    chomp($joboutput);
-    # TODO: Ensure exit status == 0.
+    sysread($child_out, $joboutput, 1024 * 1024);
+    waitpid($pid, 0);
+    if ($?) {
+      Log(undef, "output collection creation exited " . exit_status_s($?));
+      $joboutput = undef;
+    } else {
+      chomp($joboutput);
+    }
   } else {
     Log (undef, "timed out while creating output collection");
+    foreach my $signal (2, 2, 2, 15, 15, 9) {
+      kill($signal, $pid);
+      last if waitpid($pid, WNOHANG) == -1;
+      sleep(1);
+    }
   }
-  # TODO: kill $pid instead of waiting, now that we've decided to
-  # ignore further output.
-  waitpid($pid, 0);
+  close($child_out);
 
   return $joboutput;
 }
@@ -1400,8 +1424,11 @@ sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
-                        'arv-put', '--portable-data-hash',
+                        'arv-put',
+                        '--portable-data-hash',
+                        '--project-uuid', $Job->{owner_uuid},
                         '--retries', '3',
+                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
 }
@@ -1805,6 +1832,8 @@ use Fcntl ':flock';
 use File::Path qw( make_path remove_tree );
 use POSIX qw(getcwd);
 
+use constant TASK_TEMPFAIL => 111;
+
 # Map SDK subdirectories to the path environments they belong to.
 my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
 
@@ -1850,9 +1879,9 @@ if (@ARGV) {
   my $venv_dir = "$job_work/.arvados.venv";
   my $venv_built = -e "$venv_dir/bin/activate";
   if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
-    shell_or_die("virtualenv", "--quiet", "--system-site-packages",
+    shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages",
                  "--python=python2.7", $venv_dir);
-    shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+    shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
     $venv_built = 1;
     $Log->("Built Python SDK virtualenv");
   }
@@ -1948,12 +1977,12 @@ if ((-d $python_dir) and can_run("python2.7") and
 }
 
 if (-e "$destdir/crunch_scripts/install") {
-    shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
+    shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
     # Old version
-    shell_or_die ("./tests/autotests.sh", $install_dir);
+    shell_or_die (undef, "./tests/autotests.sh", $install_dir);
 } elsif (-e "./install.sh") {
-    shell_or_die ("./install.sh", $install_dir);
+    shell_or_die (undef, "./install.sh", $install_dir);
 }
 
 if ($commit) {
@@ -1974,15 +2003,24 @@ sub can_run {
 
 sub shell_or_die
 {
+  my $exitcode = shift;
+
   if ($ENV{"DEBUG"}) {
     print STDERR "@_\n";
   }
   if (system (@_) != 0) {
     my $err = $!;
-    my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
+    my $code = $?;
+    my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f);
     open STDERR, ">&STDERR_ORIG";
     system ("cat $destdir.log >&2");
-    die "@_ failed ($err): $exitstatus";
+    warn "@_ failed ($err): $exitstatus";
+    if (defined($exitcode)) {
+      exit $exitcode;
+    }
+    else {
+      exit (($code >> 8) || 1);
+    }
   }
 }