my @execargs = ("sh", "-c",
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
+ $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
+ my ($install_stderr_r, $install_stderr_w);
+ pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
+ set_nonblocking($install_stderr_r);
my $installpid = fork();
if ($installpid == 0)
{
+ close($install_stderr_r);
+ fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+ open(STDOUT, ">&", $install_stderr_w);
+ open(STDERR, ">&", $install_stderr_w);
srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
exit (1);
}
- while (1)
- {
- last if $installpid == waitpid (-1, WNOHANG);
+ close($install_stderr_w);
+ my $stderr_buf = '';
+ while ($installpid != waitpid(-1, WNOHANG)) {
freeze_if_want_freeze ($installpid);
- select (undef, undef, undef, 0.1);
+ # Wait up to 0.1 seconds for something to appear on stderr, then
+ # do a non-blocking read.
+ my $bits = fhbits($install_stderr_r);
+ select ($bits, undef, $bits, 0.1);
+ if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
+ {
+ while ($stderr_buf =~ /^(.*?)\n/) {
+ my $line = $1;
+ substr $stderr_buf, 0, 1+length($line), "";
+ Log(undef, "stderr $line");
+ }
+ }
}
my $install_exited = $?;
+ close($install_stderr_r);
+ if (length($stderr_buf) > 0) {
+ Log(undef, "stderr $stderr_buf")
+ }
+
Log (undef, "Install script exited ".exit_status_s($install_exited));
foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
unlink($tar_filename);
@jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
or $a <=> $b } @jobstep_todo;
my $level = $jobstep[$jobstep_todo[0]]->{level};
-Log (undef, "start level $level");
+my $initial_tasks_this_level = 0;
+foreach my $id (@jobstep_todo) {
+ $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level);
+}
+
+# If the number of tasks scheduled at this level #T is smaller than the number
+# of slots available #S, only use the first #T slots, or the first slot on
+# each node, whichever number is greater.
+#
+# When we dispatch tasks later, we'll allocate whole-node resources like RAM
+# based on these numbers. Using fewer slots makes more resources available
+# to each individual task, which should normally be a better strategy when
+# there are fewer of them running with less parallelism.
+#
+# Note that this calculation is not redone if the initial tasks at
+# this level queue more tasks at the same level. This may harm
+# overall task throughput for that level.
+my @freeslot;
+if ($initial_tasks_this_level < @node) {
+ @freeslot = (0..$#node);
+} elsif ($initial_tasks_this_level < @slot) {
+ @freeslot = (0..$initial_tasks_this_level - 1);
+} else {
+ @freeslot = (0..$#slot);
+}
+my $round_num_freeslots = scalar(@freeslot);
+my %round_max_slots = ();
+for (my $ii = $#freeslot; $ii >= 0; $ii--) {
+ my $this_slot = $slot[$freeslot[$ii]];
+ my $node_name = $this_slot->{node}->{name};
+ $round_max_slots{$node_name} ||= $this_slot->{cpu};
+ last if (scalar(keys(%round_max_slots)) >= @node);
+}
+Log(undef, "start level $level with $round_num_freeslots slots");
my %proc;
-my @freeslot = (0..$#slot);
my @holdslot;
my %reader;
my $progress_is_dirty = 1;
update_progress_stats();
-
THISROUND:
for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
next;
}
- pipe $reader{$id}, "writer" or croak ($!);
- my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
- fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+ pipe $reader{$id}, "writer" or croak("pipe() failed: $!");
+ set_nonblocking($reader{$id});
my $childslot = $freeslot[0];
my $childnode = $slot[$childslot]->{node};
$ENV{"HOME"} = $ENV{"TASK_WORK"};
$ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
- $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
+ $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}};
$ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
$ENV{"GZIP"} = "-n";
while (!@freeslot
||
- (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
+ ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
last THISROUND if $main::please_freeze || defined($main::success);
if ($main::please_info)
return $tar_contents;
}
+sub set_nonblocking {
+ my $fh = shift;
+ my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!);
+ fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
+}
+
__DATA__
#!/usr/bin/perl
#
my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
my $destdir = $ENV{"CRUNCH_SRC"};
-my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
+my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"};
my $repo = $ENV{"CRUNCH_SRC_URL"};
my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
my $job_work = $ENV{"JOB_WORK"};
my $task_work = $ENV{"TASK_WORK"};
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+
for my $dir ($destdir, $job_work, $task_work) {
if ($dir) {
make_path $dir;
remove_tree($task_work, {keep_root => 1});
}
-open(STDOUT_ORIG, ">&", STDOUT);
-open(STDERR_ORIG, ">&", STDERR);
-open(STDOUT, ">>", "$destdir.log");
-open(STDERR, ">&", STDOUT);
-
### Crunch script run mode
if (@ARGV) {
# We want to do routine logging during task 0 only. This gives the user
}
}
- close(STDOUT);
- close(STDERR);
- open(STDOUT, ">&", STDOUT_ORIG);
- open(STDERR, ">&", STDERR_ORIG);
exec(@ARGV);
die "Cannot exec `@ARGV`: $!";
}
### Installation mode
open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
flock L, LOCK_EX;
-if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
- # This version already installed -> nothing to do.
+if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) {
+ # This exact git archive (source + arvados sdk) is already installed
+ # here, so there's no need to reinstall it.
+
+ # We must consume our DATA section, though: otherwise the process
+ # feeding it to us will get SIGPIPE.
+ my $buf;
+ while (read(DATA, $buf, 65536)) { }
+
exit(0);
}
-unlink "$destdir.commit";
+unlink "$destdir.archive_hash";
mkdir $destdir;
if (!open(TARX, "|-", "tar", "-xC", $destdir)) {
close($pysdk_cfg);
}
+# Hide messages from the install script (unless it fails: shell_or_die
+# will show $destdir.log in that case).
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
if (-e "$destdir/crunch_scripts/install") {
shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir);
} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
shell_or_die (undef, "./install.sh", $install_dir);
}
-if ($commit) {
- unlink "$destdir.commit.new";
- symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
- rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
+if ($archive_hash) {
+ unlink "$destdir.archive_hash.new";
+ symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!";
+ rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!";
}
close L;