6429: Add for test child container requests are set priority=0 when parent
[arvados.git] / sdk / cli / bin / crunch-job
index 5e8edefdfe79c2823524f812aefc7f929213d164..b2c14d5b713dbfaa19c17b5d324ea4c81c1df9b8 100755 (executable)
@@ -390,12 +390,12 @@ if (!defined $no_clear_tmp) {
   my $cleanpid = fork();
   if ($cleanpid == 0)
   {
-    # Find FUSE mounts that look like Keep mounts (the mount path has the
-    # word "keep") and unmount them.  Then clean up work directories.
-    # TODO: When #5036 is done and widely deployed, we can get rid of the
-    # regular expression and just unmount everything with type fuse.keep.
+    # Find FUSE mounts under $CRUNCH_TMP and unmount them.
+    # Then clean up work directories.
+    # TODO: When #5036 is done and widely deployed, we can limit mount's
+    # -t option to simply fuse.keep.
     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
+          ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
     exit (1);
   }
   while (1)
@@ -411,7 +411,7 @@ if (!defined $no_clear_tmp) {
 }
 
 # If this job requires a Docker image, install that.
-my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
 if ($docker_locator = $Job->{docker_image_locator}) {
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
@@ -449,6 +449,42 @@ fi
       {fork => 1});
   $docker_limitmem = ($? == 0);
 
+  # Find a non-root Docker user to use.
+  # Tries the default user for the container, then 'crunch', then 'nobody',
+  # testing for whether the actual user id is non-zero.  This defends against
+  # mistakes but not malice, but we intend to harden the security in the future
+  # so we don't want anyone getting used to their jobs running as root in their
+  # Docker containers.
+  my @tryusers = ("", "crunch", "nobody");
+  foreach my $try_user (@tryusers) {
+    my $try_user_arg;
+    if ($try_user eq "") {
+      Log(undef, "Checking if container default user is not UID 0");
+      $try_user_arg = "";
+    } else {
+      Log(undef, "Checking if user '$try_user' is not UID 0");
+      $try_user_arg = "--user=$try_user";
+    }
+    srun(["srun", "--nodelist=" . $node[0]],
+         ["/bin/sh", "-ec",
+          "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " .
+          " test \$a -ne 0"],
+         {fork => 1});
+    if ($? == 0) {
+      $dockeruserarg = $try_user_arg;
+      if ($try_user eq "") {
+        Log(undef, "Container will run with default user");
+      } else {
+        Log(undef, "Container will run with $dockeruserarg");
+      }
+      last;
+    }
+  }
+
+  if (!defined $dockeruserarg) {
+    croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
+  }
+
   if ($Job->{arvados_sdk_version}) {
     # The job also specifies an Arvados SDK version.  Add the SDKs to the
     # tar file for the build script to install.
@@ -784,6 +820,9 @@ update_progress_stats();
 THISROUND:
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
+  # Don't create new tasks if we already know the job's final result.
+  last if defined($main::success);
+
   my $id = $jobstep_todo[$todo_ptr];
   my $Jobstep = $jobstep[$id];
   if ($Jobstep->{level} != $level)
@@ -844,6 +883,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
       "--job-name=$job_id.$id.$$",
        );
+
+    my $stdbuf = " stdbuf --output=0 --error=0 ";
+
+    my $arv_file_cache = "";
+    if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
+      $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
+    }
+
     my $command =
        "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
@@ -854,12 +901,13 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
-    $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+    $command .= "&& exec arv-mount --by-pdh --crunchstat-interval=10 --allow-other $arv_file_cache $ENV{TASK_KEEPMOUNT} --exec ";
     if ($docker_hash)
     {
-      my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
+      my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
+      my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
-      $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+      $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
       # We only set memory limits if Docker lets us limit both memory and swap.
       # Memory limits alone have been supported longer, but subprocesses tend
       # to get SIGKILL if they exceed that without any swap limit set.
@@ -917,12 +965,25 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       }
       $command .= "--env=\QHOME=$ENV{HOME}\E ";
       $command .= "\Q$docker_hash\E ";
-      $command .= "stdbuf --output=0 --error=0 ";
-      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+
+      if ($Job->{arvados_sdk_version}) {
+        $command .= $stdbuf;
+        $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
+      } else {
+        $command .= "/bin/sh -c \'python -c " .
+            '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
+            ">&2 2>/dev/null; " .
+            "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+            "if which stdbuf >/dev/null ; then " .
+            "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+            " else " .
+            "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+            " fi\'";
+      }
     } else {
       # Non-docker run
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
-      $command .= "stdbuf --output=0 --error=0 ";
+      $command .= $stdbuf;
       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
 
@@ -968,7 +1029,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         ||
         ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
   {
-    last THISROUND if $main::please_freeze || defined($main::success);
+    last THISROUND if $main::please_freeze;
     if ($main::please_info)
     {
       $main::please_info = 0;
@@ -980,7 +1041,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     my $gotsome
        = readfrompipes ()
        + reapchildren ();
-    if (!$gotsome)
+    if (!$gotsome || ($latest_refresh + 2 < scalar time))
     {
       check_refresh_wanted();
       check_squeue();
@@ -1129,6 +1190,9 @@ sub reapchildren
 
   if (!defined $task_success) {
     # task did not indicate one way or the other --> fail
+    Log($jobstepid, sprintf(
+          "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.",
+          exit_status_s($childstatus)));
     $Jobstep->{'arvados_task'}->{success} = 0;
     $Jobstep->{'arvados_task'}->save;
     $task_success = 0;
@@ -1645,20 +1709,24 @@ sub log_writer_finish()
 
   close($log_pipe_in);
 
+  my $logger_failed = 0;
   my $read_result = log_writer_read_output(120);
   if ($read_result == -1) {
+    $logger_failed = -1;
     Log (undef, "timed out reading from 'arv-put'");
   } elsif ($read_result != 0) {
+    $logger_failed = -2;
     Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
   if ($?) {
+    $logger_failed ||= $?;
     Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
   close($log_pipe_out);
-  my $arv_put_output = $log_pipe_out_buf;
+  my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
   $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
       $log_pipe_out_select = undef;
 
@@ -1724,13 +1792,13 @@ sub save_meta
   my $justcheckpoint = shift; # false if this will be the last meta saved
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
+  my $log_manifest = log_writer_finish();
+  return unless defined($log_manifest);
 
-  my $log_manifest = "";
   if ($Job->{log}) {
     my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
-    $log_manifest .= $prev_log_coll->{manifest_text};
+    $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
   }
-  $log_manifest .= log_writer_finish();
 
   my $log_coll = api_call(
     "collections/create", ensure_unique_name => 1, collection => {
@@ -2109,10 +2177,11 @@ if (@ARGV) {
     $Log->("Built Python SDK virtualenv");
   }
 
-  my $pip_bin = "pip";
+  my @pysdk_version_cmd = ("python", "-c",
+    "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
   if ($venv_built) {
     $Log->("Running in Python SDK virtualenv");
-    $pip_bin = "$venv_dir/bin/pip";
+    @pysdk_version_cmd = ();
     my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
     @ARGV = ("/bin/sh", "-ec",
              ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
@@ -2121,14 +2190,18 @@ if (@ARGV) {
            "\$PATH. Can't install Python SDK.");
   }
 
-  my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
-  if ($pkgs) {
-    $Log->("Using Arvados SDK:");
-    foreach my $line (split /\n/, $pkgs) {
-      $Log->($line);
+  if (@pysdk_version_cmd) {
+    open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
+    my $pysdk_version = <$pysdk_version_pipe>;
+    close($pysdk_version_pipe);
+    if ($? == 0) {
+      chomp($pysdk_version);
+      $Log->("Using Arvados SDK version $pysdk_version");
+    } else {
+      # A lot could've gone wrong here, but pretty much all of it means that
+      # Python won't be able to load the Arvados SDK.
+      $Log->("Warning: Arvados SDK not found");
     }
-  } else {
-    $Log->("Arvados SDK packages not found");
   }
 
   while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {