15516: document the use of variables with the nameref argument set.
[arvados.git] / sdk / cli / bin / crunch-job
index 5e6c3a084ed49d4f5d9e8beff6d9e38f815a2c5c..242dff708be2800f29cc6969bde1cb8952e04a6f 100755 (executable)
@@ -1,10 +1,9 @@
 #!/usr/bin/env perl
+# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
 # Copyright (C) The Arvados Authors. All rights reserved.
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
-
 =head1 NAME
 
 crunch-job: Execute job steps, save snapshots as requested, collate output.
@@ -133,6 +132,7 @@ my $resume_stash;
 my $cgroup_root = "/sys/fs/cgroup";
 my $docker_bin = "docker.io";
 my $docker_run_args = "";
+my $srun_sync_timeout = 15*60;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
@@ -142,6 +142,7 @@ GetOptions('force-unlock' => \$force_unlock,
            'cgroup-root=s' => \$cgroup_root,
            'docker-bin=s' => \$docker_bin,
            'docker-run-args=s' => \$docker_run_args,
+           'srun-sync-timeout=i' => \$srun_sync_timeout,
     );
 
 if (defined $job_api_token) {
@@ -837,8 +838,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       close($_);
     }
     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
-    open(STDOUT,">&writer");
-    open(STDERR,">&writer");
+    open(STDOUT,">&writer") or croak ($!);
+    open(STDERR,">&writer") or croak ($!);
 
     undef $dbh;
     undef $sth;
@@ -1022,7 +1023,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   delete $Jobstep->{tempfail};
 
   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
-  $Jobstep->{'arvados_task'}->save;
+  retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
 
   splice @jobstep_todo, $todo_ptr, 1;
   --$todo_ptr;
@@ -1188,6 +1189,8 @@ sub reapchildren
                     . $slot[$proc{$pid}->{slot}]->{cpu});
     my $jobstepidx = $proc{$pid}->{jobstepidx};
 
+    readfrompipes_after_exit ($jobstepidx);
+
     $children_reaped++;
     my $elapsed = time - $proc{$pid}->{time};
     my $Jobstep = $jobstep[$jobstepidx];
@@ -1205,7 +1208,7 @@ sub reapchildren
             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
             exit_status_s($childstatus)));
       $Jobstep->{'arvados_task'}->{success} = 0;
-      $Jobstep->{'arvados_task'}->save;
+      retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
       $task_success = 0;
     }
 
@@ -1258,8 +1261,7 @@ sub reapchildren
     $Jobstep->{exitcode} = $childstatus;
     $Jobstep->{finishtime} = time;
     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
-    $Jobstep->{'arvados_task'}->save;
-    process_stderr_final ($jobstepidx);
+    retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API");
     Log ($jobstepidx, sprintf("task output (%d bytes): %s",
                               length($Jobstep->{'arvados_task'}->{output}),
                               $Jobstep->{'arvados_task'}->{output}));
@@ -1562,9 +1564,27 @@ sub preprocess_stderr
 }
 
 
-sub process_stderr_final
+# Read whatever is still available on its stderr+stdout pipes after
+# the given child process has exited.
+sub readfrompipes_after_exit
 {
   my $jobstepidx = shift;
+
+  # The fact that the child has exited allows some convenient
+  # simplifications: (1) all data must have already been written, so
+  # there's no need to wait for more once sysread returns 0; (2) the
+  # total amount of data available is bounded by the pipe buffer size,
+  # so it's safe to read everything into one string.
+  my $buf;
+  while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) {
+    $jobstep[$jobstepidx]->{stderr_at} = time;
+    $jobstep[$jobstepidx]->{stderr} .= $buf;
+  }
+  if ($jobstep[$jobstepidx]->{stdout_r}) {
+    while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) {
+      $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+    }
+  }
   preprocess_stderr ($jobstepidx);
 
   map {
@@ -1614,7 +1634,9 @@ sub create_output_collection
   Log (undef, "collate");
 
   my ($child_out, $child_in);
-  my $pid = open2($child_out, $child_in, 'python', '-c', q{
+  # This depends on the python-arvados-python-client package, which needs to be installed
+  # on the machine running crunch-dispatch (typically, the API server).
+  my $pid = open2($child_out, $child_in, '/usr/share/python2.7/dist/python-arvados-python-client/bin/python', '-c', q{
 import arvados
 import sys
 print (arvados.api("v1").collections().
@@ -1989,6 +2011,8 @@ sub srun_sync
   my ($stdout_r, $stdout_w);
   pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
 
+  my $started_srun = scalar time;
+
   my $srunpid = fork();
   if ($srunpid == 0)
   {
@@ -1996,8 +2020,8 @@ sub srun_sync
     close($stdout_r);
     fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
     fcntl($stdout_w, F_SETFL, 0) or croak($!);
-    open(STDERR, ">&", $stderr_w);
-    open(STDOUT, ">&", $stdout_w);
+    open(STDERR, ">&", $stderr_w) or croak ($!);
+    open(STDOUT, ">&", $stdout_w) or croak ($!);
     srun ($srunargs, $execargs, $opts, $stdin);
     exit (1);
   }
@@ -2032,12 +2056,17 @@ sub srun_sync
     if (!$busy) {
       select(undef, undef, undef, 0.1);
     }
+    if (($started_srun + $srun_sync_timeout) < scalar time) {
+      # Exceeded general timeout for "srun_sync" operations, likely
+      # means something got stuck on the remote node.
+      Log(undef, "srun_sync exceeded timeout, will fail.");
+      $main::please_freeze = 1;
+    }
     killem(keys %proc) if $main::please_freeze;
   }
   my $exited = $?;
 
-  1 while readfrompipes();
-  process_stderr_final ($jobstepidx);
+  readfrompipes_after_exit ($jobstepidx);
 
   Log (undef, "$label: exit ".exit_status_s($exited));
 
@@ -2129,6 +2158,7 @@ sub find_docker_image {
           return (undef, undef);  # More than one file in the Collection.
         } else {
           $filename = (split(/:/, $filedata, 3))[2];
+          $filename =~ s/\\([0-3][0-7][0-7])/chr(oct($1))/ge;
         }
       }
     }
@@ -2177,8 +2207,22 @@ sub retry_op {
   # that can be retried, the second function will be called with
   # the current try count (0-based), next try time, and error message.
   my $operation = shift;
-  my $retry_callback = shift;
+  my $op_text = shift;
   my $retries = retry_count();
+  my $retry_callback = sub {
+    my ($try_count, $next_try_at, $errmsg) = @_;
+    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+    $errmsg =~ s/\s/ /g;
+    $errmsg =~ s/\s+$//;
+    my $retry_msg;
+    if ($next_try_at < time) {
+      $retry_msg = "Retrying.";
+    } else {
+      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+      $retry_msg = "Retrying at $next_try_fmt.";
+    }
+    Log(undef, "$op_text failed: $errmsg. $retry_msg");
+  };
   foreach my $try_count (0..$retries) {
     my $next_try = time + (2 ** $try_count);
     my $result = eval { $operation->(@_); };
@@ -2201,25 +2245,11 @@ sub api_call {
   # This function will call that method, retrying as needed until
   # the current retry_count is exhausted, with a log on the first failure.
   my $method_name = shift;
-  my $log_api_retry = sub {
-    my ($try_count, $next_try_at, $errmsg) = @_;
-    $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
-    $errmsg =~ s/\s/ /g;
-    $errmsg =~ s/\s+$//;
-    my $retry_msg;
-    if ($next_try_at < time) {
-      $retry_msg = "Retrying.";
-    } else {
-      my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
-      $retry_msg = "Retrying at $next_try_fmt.";
-    }
-    Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
-  };
   my $method = $arv;
   foreach my $key (split(/\//, $method_name)) {
     $method = $method->{$key};
   }
-  return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+  return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_);
 }
 
 sub exit_status_s {
@@ -2493,8 +2523,8 @@ if ((-d $python_dir) and can_run("python2.7")) {
 
 # Hide messages from the install script (unless it fails: shell_or_die
 # will show $destdir.log in that case).
-open(STDOUT, ">>", "$destdir.log");
-open(STDERR, ">&", STDOUT);
+open(STDOUT, ">>", "$destdir.log") or die ($!);
+open(STDERR, ">&", STDOUT) or die ($!);
 
 if (-e "$destdir/crunch_scripts/install") {
     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
@@ -2515,7 +2545,7 @@ close L;
 
 sub can_run {
   my $command_name = shift;
-  open(my $which, "-|", "which", $command_name);
+  open(my $which, "-|", "which", $command_name) or die ($!);
   while (<$which>) { }
   close($which);
   return ($? == 0);