$ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
mkdir ($ENV{"JOB_WORK"});
+my %proc;
my $force_unlock;
my $git_dir;
my $jobspec;
my $job_api_token;
my $no_clear_tmp;
my $resume_stash;
+my $docker_bin = "/usr/bin/docker.io";
GetOptions('force-unlock' => \$force_unlock,
'git-dir=s' => \$git_dir,
'job=s' => \$jobspec,
'job-api-token=s' => \$job_api_token,
'no-clear-tmp' => \$no_clear_tmp,
'resume-stash=s' => \$resume_stash,
+ 'docker-bin=s' => \$docker_bin,
);
if (defined $job_api_token) {
$ENV{ARVADOS_API_TOKEN} = $job_api_token;
}
-my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
-my $local_job = 0;
+my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST};
$SIG{'USR1'} = sub
$main::ENV{CRUNCH_DEBUG} = 0;
};
-
-
my $arv = Arvados->new('apiVersion' => 'v1');
my $Job;
my $sth;
my @jobstep;
-my $User = api_call("users/current");
-
+my $local_job;
if ($jobspec =~ /^[-a-z\d]+$/)
{
# $jobspec is an Arvados UUID, not a JSON job specification
$Job = api_call("jobs/get", uuid => $jobspec);
+ $local_job = 0;
+}
+else
+{
+ $Job = JSON::decode_json($jobspec);
+ $local_job = 1;
+}
+
+
+# Make sure our workers (our slurm nodes, localhost, or whatever) are
+# at least able to run basic commands: they aren't down or severely
+# misconfigured.
+my $cmd = ['true'];
+if ($Job->{docker_image_locator}) {
+ $cmd = [$docker_bin, 'ps', '-q'];
+}
+Log(undef, "Sanity check is `@$cmd`");
+srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+ $cmd,
+ {fork => 1});
+if ($? != 0) {
+ Log(undef, "Sanity check failed: ".exit_status_s($?));
+ exit EX_TEMPFAIL;
+}
+Log(undef, "Sanity check OK");
+
+
+my $User = api_call("users/current");
+
+if (!$local_job) {
if (!$force_unlock) {
# Claim this job, and make sure nobody else does
eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
}
else
{
- $Job = JSON::decode_json($jobspec);
-
if (!$resume_stash)
{
map { croak ("No $_ specified") unless $Job->{$_} }
my @jobstep_done = ();
my @jobstep_tomerge = ();
my $jobstep_tomerge_level = 0;
-my $squeue_checked;
-my $squeue_kill_checked;
+my $squeue_checked = 0;
my $latest_refresh = scalar time;
}
# If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
if ($docker_locator = $Job->{docker_image_locator}) {
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
}
}
else {
- Log(undef, "Run install script on all workers");
-
- my @srunargs = ("srun",
- "--nodelist=$nodelist",
- "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
- my @execargs = ("sh", "-c",
- "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+ my $install_exited;
+ my $install_script_tries_left = 3;
+ for (my $attempts = 0; $attempts < 3; $attempts++) {
+ Log(undef, "Run install script on all workers");
+
+ my @srunargs = ("srun",
+ "--nodelist=$nodelist",
+ "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
+ my @execargs = ("sh", "-c",
+ "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+
+ $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
+ my ($install_stderr_r, $install_stderr_w);
+ pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
+ set_nonblocking($install_stderr_r);
+ my $installpid = fork();
+ if ($installpid == 0)
+ {
+ close($install_stderr_r);
+ fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+ open(STDOUT, ">&", $install_stderr_w);
+ open(STDERR, ">&", $install_stderr_w);
+ srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
+ exit (1);
+ }
+ close($install_stderr_w);
+ # Tell freeze_if_want_freeze how to kill the child, otherwise the
+ # "waitpid(installpid)" loop won't get interrupted by a freeze:
+ $proc{$installpid} = {};
+ my $stderr_buf = '';
+ # Track whether anything appears on stderr other than slurm errors
+ # ("srun: ...") and the "starting: ..." message printed by the
+ # srun subroutine itself:
+ my $stderr_anything_from_script = 0;
+ my $match_our_own_errors = '^(srun: error: |starting: \[)';
+ while ($installpid != waitpid(-1, WNOHANG)) {
+ freeze_if_want_freeze ($installpid);
+ # Wait up to 0.1 seconds for something to appear on stderr, then
+ # do a non-blocking read.
+ my $bits = fhbits($install_stderr_r);
+ select ($bits, undef, $bits, 0.1);
+ if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
+ {
+ while ($stderr_buf =~ /^(.*?)\n/) {
+ my $line = $1;
+ substr $stderr_buf, 0, 1+length($line), "";
+ Log(undef, "stderr $line");
+ if ($line !~ /$match_our_own_errors/) {
+ $stderr_anything_from_script = 1;
+ }
+ }
+ }
+ }
+ delete $proc{$installpid};
+ $install_exited = $?;
+ close($install_stderr_r);
+ if (length($stderr_buf) > 0) {
+ if ($stderr_buf !~ /$match_our_own_errors/) {
+ $stderr_anything_from_script = 1;
+ }
+ Log(undef, "stderr $stderr_buf")
+ }
- my $installpid = fork();
- if ($installpid == 0)
- {
- srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
- exit (1);
+ Log (undef, "Install script exited ".exit_status_s($install_exited));
+ last if $install_exited == 0 || $main::please_freeze;
+ # If the install script fails but doesn't print an error message,
+ # the next thing anyone is likely to do is just run it again in
+ # case it was a transient problem like "slurm communication fails
+ # because the network isn't reliable enough". So we'll just do
+ # that ourselves (up to 3 attempts in total). OTOH, if there is an
+ # error message, the problem is more likely to have a real fix and
+ # we should fail the job so the fixing process can start, instead
+ # of doing 2 more attempts.
+ last if $stderr_anything_from_script;
}
- while (1)
- {
- last if $installpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($installpid);
- select (undef, undef, undef, 0.1);
- }
- my $install_exited = $?;
- Log (undef, "Install script exited ".exit_status_s($install_exited));
+
foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
unlink($tar_filename);
}
- exit (1) if $install_exited != 0;
+
+ if ($install_exited != 0) {
+ croak("Giving up");
+ }
}
foreach (qw (script script_version script_parameters runtime_constraints))
}
Log(undef, "start level $level with $round_num_freeslots slots");
-my %proc;
my @holdslot;
my %reader;
my $progress_is_dirty = 1;
next;
}
- pipe $reader{$id}, "writer" or croak ($!);
- my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
- fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+ pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
+ set_nonblocking($reader{$id});
my $childslot = $freeslot[0];
my $childnode = $slot[$childslot]->{node};
Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary ' : 'permanent',
+ $temporary_fail ? 'temporary' : 'permanent',
$elapsed));
if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
sub check_squeue
{
- # return if the kill list was checked <4 seconds ago
- if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
- {
- return;
- }
- $squeue_kill_checked = time;
+ my $last_squeue_check = $squeue_checked;
- # use killem() on procs whose killtime is reached
- for (keys %proc)
+ # Do not call `squeue` or check the kill list more than once every
+ # 15 seconds.
+ return if $last_squeue_check > time - 15;
+ $squeue_checked = time;
+
+ # Look for children from which we haven't received stderr data since
+ # the last squeue check. If no such children exist, all procs are
+ # alive and there's no need to even look at squeue.
+ #
+ # As long as the crunchstat poll interval (10s) is shorter than the
+ # squeue check interval (15s) this should make the squeue check an
+ # infrequent event.
+ my $silent_procs = 0;
+ for my $jobstep (values %proc)
{
- if (exists $proc{$_}->{killtime}
- && $proc{$_}->{killtime} <= time)
+ if ($jobstep->{stderr_at} < $last_squeue_check)
{
- killem ($_);
+ $silent_procs++;
}
}
+ return if $silent_procs == 0;
- # return if the squeue was checked <60 seconds ago
- if (defined $squeue_checked && $squeue_checked > time - 60)
+ # use killem() on procs whose killtime is reached
+ while (my ($pid, $jobstep) = each %proc)
{
- return;
+ if (exists $jobstep->{killtime}
+ && $jobstep->{killtime} <= time
+ && $jobstep->{stderr_at} < $last_squeue_check)
+ {
+ Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task disappeared from slurm queue)");
+ killem ($pid);
+ }
}
- $squeue_checked = time;
if (!$have_slurm)
{
}
# get a list of steps still running
- my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
- chop @squeue;
- if ($squeue[-1] ne "ok")
+ my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%i %j' --noheader`;
+ if ($? != 0)
{
+ Log(undef, "warning: squeue exit status $? ($!)");
return;
}
- pop @squeue;
+ chop @squeue;
# which of my jobsteps are running, according to squeue?
my %ok;
{
if (/^(\d+)\.(\d+) (\S+)/)
{
- if ($1 eq $ENV{SLURM_JOBID})
+ if ($1 eq $ENV{SLURM_JOB_ID})
{
$ok{$3} = 1;
}
}
}
- # which of my active child procs (>60s old) were not mentioned by squeue?
- foreach (keys %proc)
+ # Check for child procs >60s old and not mentioned by squeue.
+ while (my ($pid, $jobstep) = each %proc)
{
- if ($proc{$_}->{time} < time - 60
- && !exists $ok{$proc{$_}->{jobstepname}}
- && !exists $proc{$_}->{killtime})
+ if ($jobstep->{time} < time - 60
+ && $jobstep->{jobstepname}
+ && !exists $ok{$jobstep->{jobstepname}}
+ && !exists $jobstep->{killtime})
{
- # kill this proc if it hasn't exited in 30 seconds
- $proc{$_}->{killtime} = time + 30;
+ # According to slurm, this task has ended (successfully or not)
+ # -- but our srun child hasn't exited. First we must wait (30
+ # seconds) in case this is just a race between communication
+ # channels. Then, if our srun child process still hasn't
+ # terminated, we'll conclude some slurm communication
+ # error/delay has caused the task to die without notifying srun,
+ # and we'll kill srun ourselves.
+ $jobstep->{killtime} = time + 30;
+ Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
if ($have_slurm)
{
Log (undef, "release job allocation");
- system "scancel $ENV{SLURM_JOBID}";
+ system "scancel $ENV{SLURM_JOB_ID}";
}
}
while (0 < sysread ($reader{$job}, $buf, 8192))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
+ $jobstep[$job]->{stderr_at} = time;
$jobstep[$job]->{stderr} .= $buf;
preprocess_stderr ($job);
if (length ($jobstep[$job]->{stderr}) > 16384)
my $show_cmd = Dumper($args);
$show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
$show_cmd =~ s/\n/ /g;
- warn "starting: $show_cmd\n";
+ if ($opts->{fork}) {
+ Log(undef, "starting: $show_cmd");
+ } else {
+ # This is a child process: parent is in charge of reading our
+ # stderr and copying it to Log() if needed.
+ warn "starting: $show_cmd\n";
+ }
if (defined $stdin) {
my $child = open STDIN, "-|";
return $tar_contents;
}
+sub set_nonblocking {
+ my $fh = shift;
+ my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
+ fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+}
+
__DATA__
#!/usr/bin/perl
#
my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
my $destdir = $ENV{"CRUNCH_SRC"};
-my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
+my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
my $repo = $ENV{"CRUNCH_SRC_URL"};
my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
my $job_work = $ENV{"JOB_WORK"};
my $task_work = $ENV{"TASK_WORK"};
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+
for my $dir ($destdir, $job_work, $task_work) {
if ($dir) {
make_path $dir;
remove_tree($task_work, {keep_root => 1});
}
-open(STDOUT_ORIG, ">&", STDOUT);
-open(STDERR_ORIG, ">&", STDERR);
-open(STDOUT, ">>", "$destdir.log");
-open(STDERR, ">&", STDOUT);
-
### Crunch script run mode
if (@ARGV) {
# We want to do routine logging during task 0 only. This gives the user
}
}
- close(STDOUT);
- close(STDERR);
- open(STDOUT, ">&", STDOUT_ORIG);
- open(STDERR, ">&", STDERR_ORIG);
exec(@ARGV);
die "Cannot exec `@ARGV`: $!";
}
### Installation mode
open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
- # This version already installed -> nothing to do.
+if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
+ # This exact git archive (source + arvados sdk) is already installed
+ # here, so there's no need to reinstall it.
+
+ # We must consume our DATA section, though: otherwise the process
+ # feeding it to us will get SIGPIPE.
+ my $buf;
+ while (read(DATA, $buf, 65536)) { }
+
exit(0);
}
-unlink "$destdir.commit";
+unlink "$destdir.archive_hash";
mkdir $destdir;
-if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
- die "Error launching 'tar -xC $destdir': $!";
-}
-# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
-# get SIGPIPE. We must feed it data incrementally.
-my $tar_input;
-while (read(DATA, $tar_input, 65536)) {
- print TARX $tar_input;
-}
-if(!close(TARX)) {
- die "'tar -xC $destdir' exited $?: $!";
-}
+do {
+ # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1).
+ local $SIG{PIPE} = "IGNORE";
+ warn "Extracting archive: $archive_hash\n";
+ # --ignore-zeros is necessary sometimes: depending on how much NUL
+ # padding tar -A put on our combined archive (which in turn depends
+ # on the length of the component archives) tar without
+ # --ignore-zeros will exit before consuming stdin and cause close()
+ # to fail on the resulting SIGPIPE.
+ if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) {
+ die "Error launching 'tar -xC $destdir': $!";
+ }
+ # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we
+ # get SIGPIPE. We must feed it data incrementally.
+ my $tar_input;
+ while (read(DATA, $tar_input, 65536)) {
+ print TARX $tar_input;
+ }
+ if(!close(TARX)) {
+ die "'tar -xC $destdir' exited $?: $!";
+ }
+};
mkdir $install_dir;
close($pysdk_cfg);
}
+# Hide messages from the install script (unless it fails: shell_or_die
+# will show $destdir.log in that case).
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
if (-e "$destdir/crunch_scripts/install") {
shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
shell_or_die (undef, "./install.sh", $install_dir);
}
-if ($commit) {
- unlink "$destdir.commit.new";
- symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
- rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
+if ($archive_hash) {
+ unlink "$destdir.archive_hash.new";
+ symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
+ rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
}
close L;