$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
mkdir ($ENV{"JOB_WORK"});
+my %proc;
my $force_unlock;
my $git_dir;
my $jobspec;
my $job_api_token;
my $no_clear_tmp;
my $resume_stash;
+my $docker_bin = "/usr/bin/docker.io";
GetOptions('force-unlock' => \$force_unlock,
'git-dir=s' => \$git_dir,
'job=s' => \$jobspec,
'job-api-token=s' => \$job_api_token,
'no-clear-tmp' => \$no_clear_tmp,
'resume-stash=s' => \$resume_stash,
+ 'docker-bin=s' => \$docker_bin,
);
if (defined $job_api_token) {
}
my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $local_job = 0;
$SIG{'USR1'} = sub
$main::ENV{CRUNCH_DEBUG} = 0;
};
-
-
my $arv = Arvados->new('apiVersion' => 'v1');
my $Job;
my $sth;
my @jobstep;
-my $User = api_call("users/current");
-
+my $local_job;
if ($jobspec =~ /^[-a-z\d]+$/)
{
# $jobspec is an Arvados UUID, not a JSON job specification
$Job = api_call("jobs/get", uuid => $jobspec);
+ $local_job = 0;
+}
+else
+{
+ $Job = JSON::decode_json($jobspec);
+ $local_job = 1;
+}
+
+
+# Make sure our workers (our slurm nodes, localhost, or whatever) are
+# at least able to run basic commands: they aren't down or severely
+# misconfigured.
+my $cmd = ['true'];
+if ($Job->{docker_image_locator}) {
+ $cmd = [$docker_bin, 'ps', '-q'];
+}
+Log(undef, "Sanity check is `@$cmd`");
+srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+ $cmd,
+ {fork => 1});
+if ($? != 0) {
+ Log(undef, "Sanity check failed: ".exit_status_s($?));
+ exit EX_TEMPFAIL;
+}
+Log(undef, "Sanity check OK");
+
+
+my $User = api_call("users/current");
+
+if (!$local_job) {
if (!$force_unlock) {
# Claim this job, and make sure nobody else does
eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
}
else
{
- $Job = JSON::decode_json($jobspec);
-
if (!$resume_stash)
{
map { croak ("No $_ specified") unless $Job->{$_} }
}
# If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
if ($docker_locator = $Job->{docker_image_locator}) {
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
}
}
else {
- Log(undef, "Run install script on all workers");
-
- my @srunargs = ("srun",
- "--nodelist=$nodelist",
- "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
- my @execargs = ("sh", "-c",
- "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
-
- $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
- my ($install_stderr_r, $install_stderr_w);
- pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
- set_nonblocking($install_stderr_r);
- my $installpid = fork();
- if ($installpid == 0)
- {
- close($install_stderr_r);
- fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
- open(STDOUT, ">&", $install_stderr_w);
- open(STDERR, ">&", $install_stderr_w);
- srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
- exit (1);
- }
- close($install_stderr_w);
- my $stderr_buf = '';
- while ($installpid != waitpid(-1, WNOHANG)) {
- freeze_if_want_freeze ($installpid);
- # Wait up to 0.1 seconds for something to appear on stderr, then
- # do a non-blocking read.
- my $bits = fhbits($install_stderr_r);
- select ($bits, undef, $bits, 0.1);
- if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
+ my $install_exited;
+ my $install_script_tries_left = 3;
+ for (my $attempts = 0; $attempts < 3; $attempts++) {
+ Log(undef, "Run install script on all workers");
+
+ my @srunargs = ("srun",
+ "--nodelist=$nodelist",
+ "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+ my @execargs = ("sh", "-c",
+ "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+
+ $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
+ my ($install_stderr_r, $install_stderr_w);
+ pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
+ set_nonblocking($install_stderr_r);
+ my $installpid = fork();
+ if ($installpid == 0)
{
- while ($stderr_buf =~ /^(.*?)\n/) {
- my $line = $1;
- substr $stderr_buf, 0, 1+length($line), "";
- Log(undef, "stderr $line");
+ close($install_stderr_r);
+ fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+ open(STDOUT, ">&", $install_stderr_w);
+ open(STDERR, ">&", $install_stderr_w);
+ srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
+ exit (1);
+ }
+ close($install_stderr_w);
+ # Tell freeze_if_want_freeze how to kill the child, otherwise the
+ # "waitpid(installpid)" loop won't get interrupted by a freeze:
+ $proc{$installpid} = {};
+ my $stderr_buf = '';
+ # Track whether anything appears on stderr other than slurm errors
+ # ("srun: ...") and the "starting: ..." message printed by the
+ # srun subroutine itself:
+ my $stderr_anything_from_script = 0;
+ my $match_our_own_errors = '^(srun: error: |starting: \[)';
+ while ($installpid != waitpid(-1, WNOHANG)) {
+ freeze_if_want_freeze ($installpid);
+ # Wait up to 0.1 seconds for something to appear on stderr, then
+ # do a non-blocking read.
+ my $bits = fhbits($install_stderr_r);
+ select ($bits, undef, $bits, 0.1);
+ if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
+ {
+ while ($stderr_buf =~ /^(.*?)\n/) {
+ my $line = $1;
+ substr $stderr_buf, 0, 1+length($line), "";
+ Log(undef, "stderr $line");
+ if ($line !~ /$match_our_own_errors/) {
+ $stderr_anything_from_script = 1;
+ }
+ }
}
}
- }
- my $install_exited = $?;
- close($install_stderr_r);
- if (length($stderr_buf) > 0) {
- Log(undef, "stderr $stderr_buf")
+ delete $proc{$installpid};
+ $install_exited = $?;
+ close($install_stderr_r);
+ if (length($stderr_buf) > 0) {
+ if ($stderr_buf !~ /$match_our_own_errors/) {
+ $stderr_anything_from_script = 1;
+ }
+ Log(undef, "stderr $stderr_buf")
+ }
+
+ Log (undef, "Install script exited ".exit_status_s($install_exited));
+ last if $install_exited == 0 || $main::please_freeze;
+ # If the install script fails but doesn't print an error message,
+ # the next thing anyone is likely to do is just run it again in
+ # case it was a transient problem like "slurm communication fails
+ # because the network isn't reliable enough". So we'll just do
+ # that ourselves (up to 3 attempts in total). OTOH, if there is an
+ # error message, the problem is more likely to have a real fix and
+ # we should fail the job so the fixing process can start, instead
+ # of doing 2 more attempts.
+ last if $stderr_anything_from_script;
}
- Log (undef, "Install script exited ".exit_status_s($install_exited));
foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
unlink($tar_filename);
}
- exit (1) if $install_exited != 0;
+
+ if ($install_exited != 0) {
+ croak("Giving up");
+ }
}
foreach (qw (script script_version script_parameters runtime_constraints))
}
Log(undef, "start level $level with $round_num_freeslots slots");
-my %proc;
my @holdslot;
my %reader;
my $progress_is_dirty = 1;
Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary ' : 'permanent',
+ $temporary_fail ? 'temporary' : 'permanent',
$elapsed));
if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
my $show_cmd = Dumper($args);
$show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
$show_cmd =~ s/\n/ /g;
- warn "starting: $show_cmd\n";
+ if ($opts->{fork}) {
+ Log(undef, "starting: $show_cmd");
+ } else {
+ # This is a child process: parent is in charge of reading our
+ # stderr and copying it to Log() if needed.
+ warn "starting: $show_cmd\n";
+ }
if (defined $stdin) {
my $child = open STDIN, "-|";
unlink "$destdir.archive_hash";
mkdir $destdir;
-if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
- die "Error launching 'tar -xC $destdir': $!";
-}
-# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
-# get SIGPIPE. We must feed it data incrementally.
-my $tar_input;
-while (read(DATA, $tar_input, 65536)) {
- print TARX $tar_input;
-}
-if(!close(TARX)) {
- die "'tar -xC $destdir' exited $?: $!";
-}
+do {
+ # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
+ local $SIG{PIPE} = "IGNORE";
+ warn "Extracting archive: $archive_hash\n";
+ if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
+ die "Error launching 'tar -xC $destdir': $!";
+ }
+ # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+ # get SIGPIPE. We must feed it data incrementally.
+ my $tar_input;
+ while (read(DATA, $tar_input, 65536)) {
+ print TARX $tar_input;
+ }
+ if(!close(TARX)) {
+ die "'tar -xC $destdir' exited $?: $!";
+ }
+};
mkdir $install_dir;