Merge branch '11917-dont-clear-cache'
[arvados.git] / sdk / cli / bin / crunch-job
index 149d20b1d887369027a313d360dd013bdfbc0933..afca52cb73f1809d5f9f25cca69ba28be8becab1 100755 (executable)
@@ -1,4 +1,8 @@
 #!/usr/bin/env perl
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
 
 =head1 NAME
@@ -355,6 +359,7 @@ my @jobstep_done = ();
 my @jobstep_tomerge = ();
 my $jobstep_tomerge_level = 0;
 my $squeue_checked = 0;
+my $sinfo_checked = 0;
 my $latest_refresh = scalar time;
 
 
@@ -392,12 +397,12 @@ if (!defined $no_clear_tmp) {
   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
   # up work directories crunch_tmp/work, crunch_tmp/opt,
   # crunch_tmp/src*.
-  #
-  # TODO: When #5036 is done and widely deployed, we can limit mount's
-  # -t option to simply fuse.keep.
   my ($exited, $stdout, $stderr) = srun_sync(
     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-    ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+    ['bash', '-ec', q{
+arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
+rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
+    }],
     {label => "clean work dirs"});
   if ($exited != 0) {
     exit(EX_RETRY_UNLOCKED);
@@ -416,18 +421,22 @@ if ($docker_locator = $Job->{docker_image_locator}) {
   Log (undef, "docker image hash is $docker_hash");
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
-if $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
-    exit 0
+loaded() {
+  id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1
+  echo "image ID is \$id"
+  [[ \${id} = \Q$docker_hash\E ]]
+}
+if loaded >&2 2>/dev/null; then
+  echo >&2 "image is already present"
+  exit 0
 fi
-declare -a exit_codes=("\${PIPESTATUS[@]}")
-if [ 0 != "\${exit_codes[0]}" ]; then
-   exit "\${exit_codes[0]}"  # `docker images` failed
-elif [ 1 != "\${exit_codes[1]}" ]; then
-   exit "\${exit_codes[1]}"  # `grep` encountered an error
-else
-   # Everything worked fine, but grep didn't find the image on this host.
-   arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+echo >&2 "docker image is not present; loading"
+arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+if ! loaded >&2; then
+  echo >&2 "`docker load` exited 0, but image is not found (!)"
+  exit 1
 fi
+echo >&2 "image loaded successfully"
 };
 
   my ($exited, $stdout, $stderr) = srun_sync(
@@ -714,6 +723,15 @@ foreach (split (/\n/, $Job->{knobs}))
 {
   Log (undef, "knob " . $_);
 }
+my $resp = api_call(
+  'nodes/list',
+  'filters' => [['hostname', 'in', \@node]],
+  'order' => 'hostname',
+  'limit' => scalar(@node),
+    );
+for my $n (@{$resp->{items}}) {
+  Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties}));
+}
 
 
 
@@ -863,9 +881,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
         .q{&& declare -a VOLUMES=() }
-        .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner") ; fi }
-        .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt") ; }
-        .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt") ; fi };
+        .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
+        .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
+        .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
 
     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
@@ -1116,10 +1134,10 @@ freeze();
 my $collated_output = save_output_collection();
 Log (undef, "finish");
 
-save_meta();
+my $final_log = save_meta();
 
 my $final_state;
-if ($collated_output && $main::success) {
+if ($collated_output && $final_log && $main::success) {
   $final_state = 'Complete';
 } else {
   $final_state = 'Failed';
@@ -1401,6 +1419,37 @@ sub check_squeue
   }
 }
 
+sub check_sinfo
+{
+  # If a node fails in a multi-node "srun" call during job setup, the call
+  # may hang instead of exiting with a nonzero code.  This function checks
+  # "sinfo" for the health of the nodes that were allocated and ensures that
+  # they are all still in the "alloc" state.  If a node that is allocated to
+  # this job is not in "alloc" state, then set please_freeze.
+  #
+  # This is only called from srun_sync() for node configuration.  If a
+  # node fails doing actual work, there are other recovery mechanisms.
+
+  # Do not call `sinfo` more than once every 15 seconds.
+  return if $sinfo_checked > time - 15;
+  $sinfo_checked = time;
+
+  # The output format "%t" means output node states.
+  my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`;
+  if ($? != 0)
+  {
+    Log(undef, "warning: sinfo exit status $? ($!)");
+    return;
+  }
+  chop @sinfo;
+
+  foreach (@sinfo)
+  {
+    if ($_ != "alloc" && $_ != "alloc*") {
+      $main::please_freeze = 1;
+    }
+  }
+}
 
 sub release_allocation
 {
@@ -1477,18 +1526,24 @@ sub preprocess_stderr
     my $line = $1;
     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
     Log ($jobstepidx, "stderr $line");
-    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
-      # whoa.
+    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
+      # If the allocation is revoked, we can't possibly continue, so mark all
+      # nodes as failed.  This will cause the overall exit code to be
+      # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
+      # this job.
       $main::please_freeze = 1;
+      foreach my $st (@slot) {
+        $st->{node}->{fail_count}++;
+      }
     }
-    elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
+    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       if (defined($job_slot_index)) {
         $slot[$job_slot_index]->{node}->{fail_count}++;
         ban_node_by_slot($job_slot_index);
       }
     }
-    elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+    elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
     }
@@ -1555,9 +1610,10 @@ sub create_output_collection
 import arvados
 import sys
 print (arvados.api("v1").collections().
-       create(body={"manifest_text": sys.stdin.read()}).
+       create(body={"manifest_text": sys.stdin.read(),
+                    "owner_uuid": sys.argv[2]}).
        execute(num_retries=int(sys.argv[1]))["portable_data_hash"])
-}, retry_count());
+}, retry_count(), $Job->{owner_uuid});
 
   my $task_idx = -1;
   my $manifest_size = 0;
@@ -1708,7 +1764,7 @@ sub log_writer_start($)
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
                         'arv-put',
                         '--stream',
-                        '--retries', '3',
+                        '--retries', '6',
                         '--filename', $logfilename,
                         '-');
   $log_pipe_out_buf = "";
@@ -1742,7 +1798,7 @@ sub log_writer_finish()
   close($log_pipe_in);
 
   my $logger_failed = 0;
-  my $read_result = log_writer_read_output(120);
+  my $read_result = log_writer_read_output(600);
   if ($read_result == -1) {
     $logger_failed = -1;
     Log (undef, "timed out reading from 'arv-put'");
@@ -1805,6 +1861,7 @@ sub croak
   my ($package, $file, $line) = caller;
   my $message = "@_ at $file line $line\n";
   Log (undef, $message);
+  release_allocation();
   freeze() if @jobstep_todo;
   create_output_collection() if @jobstep_todo;
   cleanup();
@@ -1845,6 +1902,8 @@ sub save_meta
     });
   Log(undef, "log collection is " . $log_coll->{portable_data_hash});
   $Job->update_attributes('log' => $log_coll->{portable_data_hash});
+
+  return $log_coll->{portable_data_hash};
 }
 
 
@@ -1906,7 +1965,6 @@ sub freezeunquote
   return $s;
 }
 
-
 sub srun_sync
 {
   my $srunargs = shift;
@@ -1961,6 +2019,7 @@ sub srun_sync
     if (!$busy || ($latest_refresh + 2 < scalar time)) {
       check_refresh_wanted();
       check_squeue();
+      check_sinfo();
     }
     if (!$busy) {
       select(undef, undef, undef, 0.1);
@@ -2066,7 +2125,7 @@ sub find_docker_image {
       }
     }
   }
-  if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
+  if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) {
     return ($streamname, $1);
   } else {
     return (undef, undef);