X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e1456a12e9b2daa5a7cae0daa7a952d71dd2dd1b..53b19718f974e7c9014644ce80fa36363ae0b693:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 912159a36f..c748904105 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -597,19 +597,43 @@ else { my @execargs = ("sh", "-c", "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); + $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); + my ($install_stderr_r, $install_stderr_w); + pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!"); + set_nonblocking($install_stderr_r); my $installpid = fork(); if ($installpid == 0) { + close($install_stderr_r); + fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec + open(STDOUT, ">&", $install_stderr_w); + open(STDERR, ">&", $install_stderr_w); srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); exit (1); } - while (1) - { - last if $installpid == waitpid (-1, WNOHANG); + close($install_stderr_w); + my $stderr_buf = ''; + while ($installpid != waitpid(-1, WNOHANG)) { freeze_if_want_freeze ($installpid); - select (undef, undef, undef, 0.1); + # Wait up to 0.1 seconds for something to appear on stderr, then + # do a non-blocking read. + my $bits = fhbits($install_stderr_r); + select ($bits, undef, $bits, 0.1); + if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf))) + { + while ($stderr_buf =~ /^(.*?)\n/) { + my $line = $1; + substr $stderr_buf, 0, 1+length($line), ""; + Log(undef, "stderr $line"); + } + } } my $install_exited = $?; + close($install_stderr_r); + if (length($stderr_buf) > 0) { + Log(undef, "stderr $stderr_buf") + } + Log (undef, "Install script exited ".exit_status_s($install_exited)); foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) { unlink($tar_filename); @@ -643,12 +667,44 @@ my $thisround_failed_multiple = 0; @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; my $level = $jobstep[$jobstep_todo[0]]->{level}; -Log (undef, "start level $level"); +my $initial_tasks_this_level = 0; +foreach my $id (@jobstep_todo) { + $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level); +} + +# If the number of tasks scheduled at this level #T is smaller than the number +# of slots available #S, only use the first #T slots, or the first slot on +# each node, whichever number is greater. +# +# When we dispatch tasks later, we'll allocate whole-node resources like RAM +# based on these numbers. Using fewer slots makes more resources available +# to each individual task, which should normally be a better strategy when +# there are fewer of them running with less parallelism. +# +# Note that this calculation is not redone if the initial tasks at +# this level queue more tasks at the same level. This may harm +# overall task throughput for that level. +my @freeslot; +if ($initial_tasks_this_level < @node) { + @freeslot = (0..$#node); +} elsif ($initial_tasks_this_level < @slot) { + @freeslot = (0..$initial_tasks_this_level - 1); +} else { + @freeslot = (0..$#slot); +} +my $round_num_freeslots = scalar(@freeslot); +my %round_max_slots = (); +for (my $ii = $#freeslot; $ii >= 0; $ii--) { + my $this_slot = $slot[$freeslot[$ii]]; + my $node_name = $this_slot->{node}->{name}; + $round_max_slots{$node_name} ||= $this_slot->{cpu}; + last if (scalar(keys(%round_max_slots)) >= @node); +} +Log(undef, "start level $level with $round_num_freeslots slots"); my %proc; -my @freeslot = (0..$#slot); my @holdslot; my %reader; my $progress_is_dirty = 1; @@ -657,7 +713,6 @@ my $progress_stats_updated = 0; update_progress_stats(); - THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { @@ -668,9 +723,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) next; } - pipe $reader{$id}, "writer" or croak ($!); - my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); - fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!); + pipe $reader{$id}, "writer" or croak("pipe() failed: $!"); + set_nonblocking($reader{$id}); my $childslot = $freeslot[0]; my $childnode = $slot[$childslot]->{node}; @@ -711,7 +765,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $ENV{"HOME"} = $ENV{"TASK_WORK"}; $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated - $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; + $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}}; $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"}; $ENV{"GZIP"} = "-n"; @@ -850,7 +904,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) while (!@freeslot || - (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) + ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { last THISROUND if $main::please_freeze || defined($main::success); if ($main::please_info) @@ -1830,6 +1884,12 @@ sub combined_git_archive { return $tar_contents; } +sub set_nonblocking { + my $fh = shift; + my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!); + fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!); +} + __DATA__ #!/usr/bin/perl # @@ -1857,12 +1917,15 @@ use constant TASK_TEMPFAIL => 111; my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB"); my $destdir = $ENV{"CRUNCH_SRC"}; -my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; +my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"}; my $repo = $ENV{"CRUNCH_SRC_URL"}; my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt"); my $job_work = $ENV{"JOB_WORK"}; my $task_work = $ENV{"TASK_WORK"}; +open(STDOUT_ORIG, ">&", STDOUT); +open(STDERR_ORIG, ">&", STDERR); + for my $dir ($destdir, $job_work, $task_work) { if ($dir) { make_path $dir; @@ -1874,11 +1937,6 @@ if ($task_work) { remove_tree($task_work, {keep_root => 1}); } -open(STDOUT_ORIG, ">&", STDOUT); -open(STDERR_ORIG, ">&", STDERR); -open(STDOUT, ">>", "$destdir.log"); -open(STDERR, ">&", STDOUT); - ### Crunch script run mode if (@ARGV) { # We want to do routine logging during task 0 only. This gives the user @@ -1939,10 +1997,6 @@ if (@ARGV) { } } - close(STDOUT); - close(STDERR); - open(STDOUT, ">&", STDOUT_ORIG); - open(STDERR, ">&", STDERR_ORIG); exec(@ARGV); die "Cannot exec `@ARGV`: $!"; } @@ -1950,12 +2004,19 @@ if (@ARGV) { ### Installation mode open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; -if (readlink ("$destdir.commit") eq $commit && -d $destdir) { - # This version already installed -> nothing to do. +if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) { + # This exact git archive (source + arvados sdk) is already installed + # here, so there's no need to reinstall it. + + # We must consume our DATA section, though: otherwise the process + # feeding it to us will get SIGPIPE. + my $buf; + while (read(DATA, $buf, 65536)) { } + exit(0); } -unlink "$destdir.commit"; +unlink "$destdir.archive_hash"; mkdir $destdir; if (!open(TARX, "|-", "tar", "-xC", $destdir)) { @@ -1995,6 +2056,11 @@ if ((-d $python_dir) and can_run("python2.7") and close($pysdk_cfg); } +# Hide messages from the install script (unless it fails: shell_or_die +# will show $destdir.log in that case). +open(STDOUT, ">>", "$destdir.log"); +open(STDERR, ">&", STDOUT); + if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir); } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") { @@ -2004,10 +2070,10 @@ if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "./install.sh", $install_dir); } -if ($commit) { - unlink "$destdir.commit.new"; - symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!"; - rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!"; +if ($archive_hash) { + unlink "$destdir.archive_hash.new"; + symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!"; + rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!"; } close L;