6146: Ignore SIGPIPE while feeding data to tar. Rely on close() retval instead.
[arvados.git] / sdk / cli / bin / crunch-job
index d76bbd97cd03083265d2cd879c84708f4e8da4e6..864e3c0d37948bc86869234c52e51b1de77ad294 100755 (executable)
@@ -118,18 +118,21 @@ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
 mkdir ($ENV{"JOB_WORK"});
 
+my %proc;
 my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $no_clear_tmp;
 my $resume_stash;
+my $docker_bin = "/usr/bin/docker.io";
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
            'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
+           'docker-bin=s' => \$docker_bin,
     );
 
 if (defined $job_api_token) {
@@ -137,7 +140,6 @@ if (defined $job_api_token) {
 }
 
 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $local_job = 0;
 
 
 $SIG{'USR1'} = sub
@@ -149,8 +151,6 @@ $SIG{'USR2'} = sub
   $main::ENV{CRUNCH_DEBUG} = 0;
 };
 
-
-
 my $arv = Arvados->new('apiVersion' => 'v1');
 
 my $Job;
@@ -159,12 +159,41 @@ my $dbh;
 my $sth;
 my @jobstep;
 
-my $User = api_call("users/current");
-
+my $local_job;
 if ($jobspec =~ /^[-a-z\d]+$/)
 {
   # $jobspec is an Arvados UUID, not a JSON job specification
   $Job = api_call("jobs/get", uuid => $jobspec);
+  $local_job = 0;
+}
+else
+{
+  $Job = JSON::decode_json($jobspec);
+  $local_job = 1;
+}
+
+
+# Make sure our workers (our slurm nodes, localhost, or whatever) are
+# at least able to run basic commands: they aren't down or severely
+# misconfigured.
+my $cmd = ['true'];
+if ($Job->{docker_image_locator}) {
+  $cmd = [$docker_bin, 'ps', '-q'];
+}
+Log(undef, "Sanity check is `@$cmd`");
+srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+     $cmd,
+     {fork => 1});
+if ($? != 0) {
+  Log(undef, "Sanity check failed: ".exit_status_s($?));
+  exit EX_TEMPFAIL;
+}
+Log(undef, "Sanity check OK");
+
+
+my $User = api_call("users/current");
+
+if (!$local_job) {
   if (!$force_unlock) {
     # Claim this job, and make sure nobody else does
     eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
@@ -176,8 +205,6 @@ if ($jobspec =~ /^[-a-z\d]+$/)
 }
 else
 {
-  $Job = JSON::decode_json($jobspec);
-
   if (!$resume_stash)
   {
     map { croak ("No $_ specified") unless $Job->{$_} }
@@ -375,7 +402,6 @@ if (!defined $no_clear_tmp) {
 }
 
 # If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
 if ($docker_locator = $Job->{docker_image_locator}) {
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
@@ -589,33 +615,89 @@ if (!defined $git_archive) {
   }
 }
 else {
-  Log(undef, "Run install script on all workers");
-
-  my @srunargs = ("srun",
-                  "--nodelist=$nodelist",
-                  "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
-  my @execargs = ("sh", "-c",
-                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+  my $install_exited;
+  my $install_script_tries_left = 3;
+  for (my $attempts = 0; $attempts < 3; $attempts++) {
+    Log(undef, "Run install script on all workers");
+
+    my @srunargs = ("srun",
+                    "--nodelist=$nodelist",
+                    "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+    my @execargs = ("sh", "-c",
+                    "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+
+    $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
+    my ($install_stderr_r, $install_stderr_w);
+    pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
+    set_nonblocking($install_stderr_r);
+    my $installpid = fork();
+    if ($installpid == 0)
+    {
+      close($install_stderr_r);
+      fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+      open(STDOUT, ">&", $install_stderr_w);
+      open(STDERR, ">&", $install_stderr_w);
+      srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
+      exit (1);
+    }
+    close($install_stderr_w);
+    # Tell freeze_if_want_freeze how to kill the child, otherwise the
+    # "waitpid(installpid)" loop won't get interrupted by a freeze:
+    $proc{$installpid} = {};
+    my $stderr_buf = '';
+    # Track whether anything appears on stderr other than slurm errors
+    # ("srun: ...") and the "starting: ..." message printed by the
+    # srun subroutine itself:
+    my $stderr_anything_from_script = 0;
+    my $match_our_own_errors = '^(srun: error: |starting: \[)';
+    while ($installpid != waitpid(-1, WNOHANG)) {
+      freeze_if_want_freeze ($installpid);
+      # Wait up to 0.1 seconds for something to appear on stderr, then
+      # do a non-blocking read.
+      my $bits = fhbits($install_stderr_r);
+      select ($bits, undef, $bits, 0.1);
+      if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
+      {
+        while ($stderr_buf =~ /^(.*?)\n/) {
+          my $line = $1;
+          substr $stderr_buf, 0, 1+length($line), "";
+          Log(undef, "stderr $line");
+          if ($line !~ /$match_our_own_errors/) {
+            $stderr_anything_from_script = 1;
+          }
+        }
+      }
+    }
+    delete $proc{$installpid};
+    $install_exited = $?;
+    close($install_stderr_r);
+    if (length($stderr_buf) > 0) {
+      if ($stderr_buf !~ /$match_our_own_errors/) {
+        $stderr_anything_from_script = 1;
+      }
+      Log(undef, "stderr $stderr_buf")
+    }
 
-  $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
-  my $installpid = fork();
-  if ($installpid == 0)
-  {
-    srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $installpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($installpid);
-    select (undef, undef, undef, 0.1);
+    Log (undef, "Install script exited ".exit_status_s($install_exited));
+    last if $install_exited == 0 || $main::please_freeze;
+    # If the install script fails but doesn't print an error message,
+    # the next thing anyone is likely to do is just run it again in
+    # case it was a transient problem like "slurm communication fails
+    # because the network isn't reliable enough". So we'll just do
+    # that ourselves (up to 3 attempts in total). OTOH, if there is an
+    # error message, the problem is more likely to have a real fix and
+    # we should fail the job so the fixing process can start, instead
+    # of doing 2 more attempts.
+    last if $stderr_anything_from_script;
   }
-  my $install_exited = $?;
-  Log (undef, "Install script exited ".exit_status_s($install_exited));
+
   foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
     unlink($tar_filename);
   }
-  exit (1) if $install_exited != 0;
+
+  if ($install_exited != 0) {
+    croak("Giving up");
+  }
 }
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -681,7 +763,6 @@ for (my $ii = $#freeslot; $ii >= 0; $ii--) {
 }
 
 Log(undef, "start level $level with $round_num_freeslots slots");
-my %proc;
 my @holdslot;
 my %reader;
 my $progress_is_dirty = 1;
@@ -700,9 +781,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     next;
   }
 
-  pipe $reader{$id}, "writer" or croak ($!);
-  my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
-  fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+  pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
+  set_nonblocking($reader{$id});
 
   my $childslot = $freeslot[0];
   my $childnode = $slot[$childslot]->{node};
@@ -1084,7 +1164,7 @@ sub reapchildren
 
     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
                              ++$Jobstep->{'failures'},
-                             $temporary_fail ? 'temporary ' : 'permanent',
+                             $temporary_fail ? 'temporary' : 'permanent',
                              $elapsed));
 
     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
@@ -1653,7 +1733,13 @@ sub srun
   my $show_cmd = Dumper($args);
   $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
   $show_cmd =~ s/\n/ /g;
-  warn "starting: $show_cmd\n";
+  if ($opts->{fork}) {
+    Log(undef, "starting: $show_cmd");
+  } else {
+    # This is a child process: parent is in charge of reading our
+    # stderr and copying it to Log() if needed.
+    warn "starting: $show_cmd\n";
+  }
 
   if (defined $stdin) {
     my $child = open STDIN, "-|";
@@ -1862,6 +1948,12 @@ sub combined_git_archive {
   return $tar_contents;
 }
 
+sub set_nonblocking {
+  my $fh = shift;
+  my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
+  fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+}
+
 __DATA__
 #!/usr/bin/perl
 #
@@ -1895,6 +1987,9 @@ my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
 my $job_work = $ENV{"JOB_WORK"};
 my $task_work = $ENV{"TASK_WORK"};
 
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+
 for my $dir ($destdir, $job_work, $task_work) {
   if ($dir) {
     make_path $dir;
@@ -1906,11 +2001,6 @@ if ($task_work) {
   remove_tree($task_work, {keep_root => 1});
 }
 
-open(STDOUT_ORIG, ">&", STDOUT);
-open(STDERR_ORIG, ">&", STDERR);
-open(STDOUT, ">>", "$destdir.log");
-open(STDERR, ">&", STDOUT);
-
 ### Crunch script run mode
 if (@ARGV) {
   # We want to do routine logging during task 0 only.  This gives the user
@@ -1971,10 +2061,6 @@ if (@ARGV) {
     }
   }
 
-  close(STDOUT);
-  close(STDERR);
-  open(STDOUT, ">&", STDOUT_ORIG);
-  open(STDERR, ">&", STDERR_ORIG);
   exec(@ARGV);
   die "Cannot exec `@ARGV`: $!";
 }
@@ -1997,18 +2083,23 @@ if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
 unlink "$destdir.archive_hash";
 mkdir $destdir;
 
-if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
-  die "Error launching 'tar -xC $destdir': $!";
-}
-# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
-# get SIGPIPE.  We must feed it data incrementally.
-my $tar_input;
-while (read(DATA, $tar_input, 65536)) {
-  print TARX $tar_input;
-}
-if(!close(TARX)) {
-  die "'tar -xC $destdir' exited $?: $!";
-}
+do {
+  # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
+  local $SIG{PIPE} = "IGNORE";
+  warn "Extracting archive: $archive_hash\n";
+  if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
+    die "Error launching 'tar -xC $destdir': $!";
+  }
+  # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+  # get SIGPIPE.  We must feed it data incrementally.
+  my $tar_input;
+  while (read(DATA, $tar_input, 65536)) {
+    print TARX $tar_input;
+  }
+  if(!close(TARX)) {
+    die "'tar -xC $destdir' exited $?: $!";
+  }
+};
 
 mkdir $install_dir;
 
@@ -2034,6 +2125,11 @@ if ((-d $python_dir) and can_run("python2.7") and
   close($pysdk_cfg);
 }
 
+# Hide messages from the install script (unless it fails: shell_or_die
+# will show $destdir.log in that case).
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
 if (-e "$destdir/crunch_scripts/install") {
     shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {