Merge branch 'master' into 8079-api-client-auth-uuid
[arvados.git] / sdk / cli / bin / crunch-job
index f78824bf2f14c468a946160335537b4fd66b7702..4e5b0826b6bceabf326b07b65660a5ec4c93f946 100755 (executable)
@@ -127,6 +127,7 @@ my $job_api_token;
 my $no_clear_tmp;
 my $resume_stash;
 my $docker_bin = "docker.io";
+my $docker_run_args = "";
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
@@ -134,6 +135,7 @@ GetOptions('force-unlock' => \$force_unlock,
            'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
            'docker-bin=s' => \$docker_bin,
+           'docker-run-args=s' => \$docker_run_args,
     );
 
 if (defined $job_api_token) {
@@ -413,11 +415,13 @@ if (!defined $no_clear_tmp) {
 # If this job requires a Docker image, install that.
 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
 if ($docker_locator = $Job->{docker_image_locator}) {
+  Log (undef, "Install docker image $docker_locator");
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
   {
     croak("No Docker image hash found from locator $docker_locator");
   }
+  Log (undef, "docker image hash is $docker_hash");
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
@@ -467,7 +471,7 @@ fi
     }
     srun(["srun", "--nodelist=" . $node[0]],
          ["/bin/sh", "-ec",
-          "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " .
+          "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
           " test \$a -ne 0"],
          {fork => 1});
     if ($? == 0) {
@@ -870,11 +874,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
     $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
     $ENV{"HOME"} = $ENV{"TASK_WORK"};
-    $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
 
+    my $keep_mnt = $ENV{"TASK_WORK"}.".keep";
+
     $ENV{"GZIP"} = "-n";
 
     my @srunargs = (
@@ -886,23 +891,32 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 
     my $stdbuf = " stdbuf --output=0 --error=0 ";
 
+    my $arv_file_cache = "";
+    if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
+      $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
+    }
+
     my $command =
-       "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
-        ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
-       ."&& cd $ENV{CRUNCH_TMP} "
+       "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; "
+        ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E "
+       ."&& cd \Q$ENV{CRUNCH_TMP}\E "
         # These environment variables get used explicitly later in
         # $command.  No tool is expected to read these values directly.
         .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' </proc/meminfo) }
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
-    $command .= "&& exec arv-mount --by-pdh --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+
+    $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
+    $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
+    $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp";
+
     if ($docker_hash)
     {
       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
-      $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
+      $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
       # We only set memory limits if Docker lets us limit both memory and swap.
       # Memory limits alone have been supported longer, but subprocesses tend
       # to get SIGKILL if they exceed that without any swap limit set.
@@ -917,14 +931,18 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
       $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
 
-      # Currently, we make arv-mount's mount point appear at /keep
-      # inside the container (instead of using the same path as the
-      # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
-      # crunch scripts and utilities must not rely on this. They must
-      # use $TASK_KEEPMOUNT.
+      # Currently, we make the "by_pdh" directory in arv-mount's mount
+      # point appear at /keep inside the container (instead of using
+      # the same path as the host like we do with CRUNCH_SRC and
+      # CRUNCH_INSTALL). However, crunch scripts and utilities must
+      # not rely on this. They must use $TASK_KEEPMOUNT.
       $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
       $ENV{TASK_KEEPMOUNT} = "/keep";
 
+      # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp.
+      $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E ";
+      $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp";
+
       # TASK_WORK is almost exactly like a docker data volume: it
       # starts out empty, is writable, and persists until no
       # containers use it any more. We don't use --volumes-from to
@@ -965,7 +983,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         $command .= $stdbuf;
         $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
       } else {
-        $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+        $command .= "/bin/sh -c \'python -c " .
+            '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
+            ">&2 2>/dev/null; " .
+            "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
             "if which stdbuf >/dev/null ; then " .
             "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
             " else " .
@@ -1038,12 +1059,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       check_refresh_wanted();
       check_squeue();
       update_progress_stats();
-      select (undef, undef, undef, 0.1);
     }
     elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
     {
       update_progress_stats();
     }
+    if (!$gotsome) {
+      select (undef, undef, undef, 0.1);
+    }
     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
                                         $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
@@ -1182,6 +1205,9 @@ sub reapchildren
 
   if (!defined $task_success) {
     # task did not indicate one way or the other --> fail
+    Log($jobstepid, sprintf(
+          "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+          exit_status_s($childstatus)));
     $Jobstep->{'arvados_task'}->{success} = 0;
     $Jobstep->{'arvados_task'}->save;
     $task_success = 0;
@@ -1318,8 +1344,9 @@ sub check_squeue
   # squeue check interval (15s) this should make the squeue check an
   # infrequent event.
   my $silent_procs = 0;
-  for my $jobstep (values %proc)
+  for my $procinfo (values %proc)
   {
+    my $jobstep = $jobstep[$procinfo->{jobstep}];
     if ($jobstep->{stderr_at} < $last_squeue_check)
     {
       $silent_procs++;
@@ -1328,17 +1355,18 @@ sub check_squeue
   return if $silent_procs == 0;
 
   # use killem() on procs whose killtime is reached
-  while (my ($pid, $jobstep) = each %proc)
+  while (my ($pid, $procinfo) = each %proc)
   {
-    if (exists $jobstep->{killtime}
-        && $jobstep->{killtime} <= time
+    my $jobstep = $jobstep[$procinfo->{jobstep}];
+    if (exists $procinfo->{killtime}
+        && $procinfo->{killtime} <= time
         && $jobstep->{stderr_at} < $last_squeue_check)
     {
       my $sincewhen = "";
       if ($jobstep->{stderr_at}) {
         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
       }
-      Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+      Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
       killem ($pid);
     }
   }
@@ -1373,12 +1401,12 @@ sub check_squeue
   }
 
   # Check for child procs >60s old and not mentioned by squeue.
-  while (my ($pid, $jobstep) = each %proc)
+  while (my ($pid, $procinfo) = each %proc)
   {
-    if ($jobstep->{time} < time - 60
-        && $jobstep->{jobstepname}
-        && !exists $ok{$jobstep->{jobstepname}}
-        && !exists $jobstep->{killtime})
+    if ($procinfo->{time} < time - 60
+        && $procinfo->{jobstepname}
+        && !exists $ok{$procinfo->{jobstepname}}
+        && !exists $procinfo->{killtime})
     {
       # According to slurm, this task has ended (successfully or not)
       # -- but our srun child hasn't exited. First we must wait (30
@@ -1387,8 +1415,8 @@ sub check_squeue
       # terminated, we'll conclude some slurm communication
       # error/delay has caused the task to die without notifying srun,
       # and we'll kill srun ourselves.
-      $jobstep->{killtime} = time + 30;
-      Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+      $procinfo->{killtime} = time + 30;
+      Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
     }
   }
 }
@@ -1410,15 +1438,21 @@ sub readfrompipes
   foreach my $job (keys %reader)
   {
     my $buf;
-    while (0 < sysread ($reader{$job}, $buf, 8192))
+    if (0 < sysread ($reader{$job}, $buf, 65536))
     {
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
       $jobstep[$job]->{stderr_at} = time;
       $jobstep[$job]->{stderr} .= $buf;
+
+      # Consume everything up to the last \n
       preprocess_stderr ($job);
+
       if (length ($jobstep[$job]->{stderr}) > 16384)
       {
-       substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+        # If we get a lot of stderr without a newline, chop off the
+        # front to avoid letting our buffer grow indefinitely.
+        substr ($jobstep[$job]->{stderr},
+                0, length($jobstep[$job]->{stderr}) - 8192) = "";
       }
       $gotsome = 1;
     }
@@ -1439,7 +1473,7 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /srun: error: Node failure on/) {
+    elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
       my $job_slot_index = $jobstep[$job]->{slotindex};
       $slot[$job_slot_index]->{node}->{fail_count}++;
       $jobstep[$job]->{tempfail} = 1;
@@ -1698,20 +1732,24 @@ sub log_writer_finish()
 
   close($log_pipe_in);
 
+  my $logger_failed = 0;
   my $read_result = log_writer_read_output(120);
   if ($read_result == -1) {
+    $logger_failed = -1;
     Log (undef, "timed out reading from 'arv-put'");
   } elsif ($read_result != 0) {
+    $logger_failed = -2;
     Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
   if ($?) {
+    $logger_failed ||= $?;
     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
   close($log_pipe_out);
-  my $arv_put_output = $log_pipe_out_buf;
+  my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
       $log_pipe_out_select = undef;
 
@@ -1777,13 +1815,13 @@ sub save_meta
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
+  my $log_manifest = log_writer_finish();
+  return unless defined($log_manifest);
 
-  my $log_manifest = "";
   if ($Job->{log}) {
     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
-    $log_manifest .= $prev_log_coll->{manifest_text};
+    $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
   }
-  $log_manifest .= log_writer_finish();
 
   my $log_coll = api_call(
     "collections/create", ensure_unique_name => 1, collection => {
@@ -2162,10 +2200,11 @@ if (@ARGV) {
     $Log->("Built Python SDK virtualenv");
   }
 
-  my $pip_bin = "pip";
+  my @pysdk_version_cmd = ("python", "-c",
+    "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
   if ($venv_built) {
     $Log->("Running in Python SDK virtualenv");
-    $pip_bin = "$venv_dir/bin/pip";
+    @pysdk_version_cmd = ();
     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
     @ARGV = ("/bin/sh", "-ec",
              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
@@ -2174,14 +2213,18 @@ if (@ARGV) {
            "\$PATH. Can't install Python SDK.");
   }
 
-  my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
-  if ($pkgs) {
-    $Log->("Using Arvados SDK:");
-    foreach my $line (split /\n/, $pkgs) {
-      $Log->($line);
+  if (@pysdk_version_cmd) {
+    open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
+    my $pysdk_version = <$pysdk_version_pipe>;
+    close($pysdk_version_pipe);
+    if ($? == 0) {
+      chomp($pysdk_version);
+      $Log->("Using Arvados SDK version $pysdk_version");
+    } else {
+      # A lot could've gone wrong here, but pretty much all of it means that
+      # Python won't be able to load the Arvados SDK.
+      $Log->("Warning: Arvados SDK not found");
     }
-  } else {
-    $Log->("Arvados SDK packages not found");
   }
 
   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {