}
}
+$ENV{"HOST_CRUNCHRUNNER_BIN"} ||= `which crunchrunner`;
+unless (defined($ENV{"HOST_CERTS"})) {
+ if (-f "/etc/ssl/certs/ca-certificates.crt") {
+ $ENV{"HOST_CERTS"} = "/etc/ssl/certs/ca-certificates.crt";
+ } elsif (-f "/etc/pki/tls/certs/ca-bundle.crt") {
+ $ENV{"HOST_CERTS"} = "/etc/pki/tls/certs/ca-bundle.crt";
+ }
+}
+
# Create the tmp directory if it does not exist
if ( ! -d $ENV{"CRUNCH_TMP"} ) {
make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
# Determine whether this version of Docker supports memory+swap limits.
($exited, $stdout, $stderr) = srun_sync(
- ["srun", "--nodelist=" . $node[0]],
+ ["srun", "--nodes=1"],
[$docker_bin, 'run', '--help'],
{label => "check --memory-swap feature"});
$docker_limitmem = ($stdout =~ /--memory-swap/);
$try_user_arg = "--user=$try_user";
}
my ($exited, $stdout, $stderr) = srun_sync(
- ["srun", "--nodelist=" . $node[0]],
+ ["srun", "--nodes=1"],
["/bin/sh", "-ec",
"$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
{label => $label});
# For now, use the same approach as TASK_WORK above.
$ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
+ # Bind mount the crunchrunner binary and host TLS certificates file into
+ # the container.
+ $command .= "--volume=\Q$ENV{HOST_CRUNCHRUNNER_BIN}:/usr/local/bin/crunchrunner\E ";
+ $command .= "--volume=\Q$ENV{HOST_CERTS}:/etc/arvados/ca-certificates.crt\E ";
+
while (my ($env_key, $env_val) = each %ENV)
{
if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
sub reapchildren
{
my $children_reaped = 0;
- while ((my $pid = waitpid (-1, WNOHANG)) > 0)
+ my @successful_task_uuids = ();
+
+ while((my $pid = waitpid (-1, WNOHANG)) > 0)
{
my $childstatus = $?;
push @jobstep_todo, $jobstepidx;
$Job->{'tasks_summary'}->{'failed'}++;
}
- else
+ else # task_success
{
+ push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid};
++$thisround_succeeded;
$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
$slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
push @freeslot, $proc{$pid}->{slot};
delete $proc{$pid};
- if ($task_success) {
- # Load new tasks
- my $newtask_list = [];
- my $newtask_results;
- do {
- $newtask_results = api_call(
- "job_tasks/list",
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
- push(@$newtask_list, @{$newtask_results->{items}});
- } while (@{$newtask_results->{items}});
- foreach my $arvados_task (@$newtask_list) {
- my $jobstep = {
- 'level' => $arvados_task->{'sequence'},
- 'failures' => 0,
- 'arvados_task' => $arvados_task
- };
- push @jobstep, $jobstep;
- push @jobstep_todo, $#jobstep;
- }
- }
$progress_is_dirty = 1;
}
+ if (scalar(@successful_task_uuids) > 0)
+ {
+ Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids)));
+ # Load new tasks
+ my $newtask_list = [];
+ my $newtask_results;
+ do {
+ $newtask_results = api_call(
+ "job_tasks/list",
+ 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]],
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
+ push(@$newtask_list, @{$newtask_results->{items}});
+ } while (@{$newtask_results->{items}});
+ Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list)));
+ foreach my $arvados_task (@$newtask_list) {
+ my $jobstep = {
+ 'level' => $arvados_task->{'sequence'},
+ 'failures' => 0,
+ 'arvados_task' => $arvados_task
+ };
+ push @jobstep, $jobstep;
+ push @jobstep_todo, $#jobstep;
+ }
+ }
+
return $children_reaped;
}