8815: Crunch-job bind mounts crunchrunner binary and certificates from host.
[arvados.git] / sdk / cli / bin / crunch-job
index ca9db1dacdb162d73ee6eb53bb79d5ec2d550014..88f9d561b46c41b4f31ea2efc1cfc225869e453b 100755 (executable)
@@ -109,6 +109,9 @@ unless (defined $ENV{"CRUNCH_TMP"}) {
   }
 }
 
+$ENV{"HOST_CRUNCHRUNNER_BIN"} ||= `which crunchrunner`;
+$ENV{"HOST_CERTS"} ||= "/etc/ssl/certs/ca-certificates.crt";
+
 # Create the tmp directory if it does not exist
 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
   make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
@@ -126,6 +129,7 @@ my $jobspec;
 my $job_api_token;
 my $no_clear_tmp;
 my $resume_stash;
+my $cgroup_root = "/sys/fs/cgroup";
 my $docker_bin = "docker.io";
 my $docker_run_args = "";
 GetOptions('force-unlock' => \$force_unlock,
@@ -134,6 +138,7 @@ GetOptions('force-unlock' => \$force_unlock,
            'job-api-token=s' => \$job_api_token,
            'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
+           'cgroup-root=s' => \$cgroup_root,
            'docker-bin=s' => \$docker_bin,
            'docker-run-args=s' => \$docker_run_args,
     );
@@ -430,7 +435,7 @@ fi
 
   # Determine whether this version of Docker supports memory+swap limits.
   ($exited, $stdout, $stderr) = srun_sync(
-    ["srun", "--nodelist=" . $node[0]],
+    ["srun", "--nodes=1"],
     [$docker_bin, 'run', '--help'],
     {label => "check --memory-swap feature"});
   $docker_limitmem = ($stdout =~ /--memory-swap/);
@@ -453,7 +458,7 @@ fi
       $try_user_arg = "--user=$try_user";
     }
     my ($exited, $stdout, $stderr) = srun_sync(
-      ["srun", "--nodelist=" . $node[0]],
+      ["srun", "--nodes=1"],
       ["/bin/sh", "-ec",
        "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
       {label => $label});
@@ -860,7 +865,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
       my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
-      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+      $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
       $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
       # We only set memory limits if Docker lets us limit both memory and swap.
       # Memory limits alone have been supported longer, but subprocesses tend
@@ -915,6 +920,11 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       # For now, use the same approach as TASK_WORK above.
       $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
 
+      # Bind mount the crunchrunner binary and host TLS certificates file into
+      # the container.
+      $command .= "--volume=\Q$ENV{HOST_CRUNCHRUNNER_BIN}:/usr/lib/crunchrunner/crunchrunner\E ";
+      $command .= "--volume=\Q$ENV{HOST_CERTS}:/usr/lib/crunchrunner/ca-certificates.crt\E ";
+
       while (my ($env_key, $env_val) = each %ENV)
       {
         if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
@@ -940,7 +950,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       }
     } else {
       # Non-docker run
-      $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
+      $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 ";
       $command .= $stdbuf;
       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
@@ -1132,7 +1142,9 @@ sub update_progress_stats
 sub reapchildren
 {
   my $children_reaped = 0;
-  while ((my $pid = waitpid (-1, WNOHANG)) > 0)
+  my @successful_task_uuids = ();
+
+  while((my $pid = waitpid (-1, WNOHANG)) > 0)
   {
     my $childstatus = $?;
 
@@ -1205,8 +1217,9 @@ sub reapchildren
       push @jobstep_todo, $jobstepidx;
       $Job->{'tasks_summary'}->{'failed'}++;
     }
-    else
+    else # task_success
     {
+      push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
       ++$thisround_succeeded;
       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
@@ -1229,34 +1242,36 @@ sub reapchildren
     push @freeslot, $proc{$pid}->{slot};
     delete $proc{$pid};
 
-    if ($task_success) {
-      # Load new tasks
-      my $newtask_list = [];
-      my $newtask_results;
-      do {
-        $newtask_results = api_call(
-          "job_tasks/list",
-          'where' => {
-            'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
-          },
-          'order' => 'qsequence',
-          'offset' => scalar(@$newtask_list),
-            );
-        push(@$newtask_list, @{$newtask_results->{items}});
-      } while (@{$newtask_results->{items}});
-      foreach my $arvados_task (@$newtask_list) {
-        my $jobstep = {
-          'level' => $arvados_task->{'sequence'},
-          'failures' => 0,
-          'arvados_task' => $arvados_task
-        };
-        push @jobstep, $jobstep;
-        push @jobstep_todo, $#jobstep;
-      }
-    }
     $progress_is_dirty = 1;
   }
 
+  if (scalar(@successful_task_uuids) > 0)
+  {
+    Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
+    # Load new tasks
+    my $newtask_list = [];
+    my $newtask_results;
+    do {
+      $newtask_results = api_call(
+        "job_tasks/list",
+        'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
+        'order' => 'qsequence',
+        'offset' => scalar(@$newtask_list),
+          );
+      push(@$newtask_list, @{$newtask_results->{items}});
+    } while (@{$newtask_results->{items}});
+    Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
+    foreach my $arvados_task (@$newtask_list) {
+      my $jobstep = {
+        'level' => $arvados_task->{'sequence'},
+        'failures' => 0,
+        'arvados_task' => $arvados_task
+      };
+      push @jobstep, $jobstep;
+      push @jobstep_todo, $#jobstep;
+    }
+  }
+
   return $children_reaped;
 }
 
@@ -1466,7 +1481,6 @@ sub preprocess_stderr
       # attached to a particular worker slot.
     }
     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
-      my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
       my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
       $slot[$job_slot_index]->{node}->{fail_count}++;
       $jobstep[$jobstepidx]->{tempfail} = 1;
@@ -1476,7 +1490,7 @@ sub preprocess_stderr
       $jobstep[$jobstepidx]->{tempfail} = 1;
       ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
     }
-    elsif ($line =~ /arvados\.errors\.Keep/) {
+    elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
     }
   }