X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d843787b4ece9952597d7814cbf10fb383c72625..e359c70eb7adb66df7c6aae6edb738e5f543d6e4:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 294696cdcf..2246c86fb6 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree ); use constant TASK_TEMPFAIL => 111; use constant EX_TEMPFAIL => 75; +use constant EX_RETRY_UNLOCKED => 93; $ENV{"TMPDIR"} ||= "/tmp"; unless (defined $ENV{"CRUNCH_TMP"}) { @@ -118,26 +119,28 @@ $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated mkdir ($ENV{"JOB_WORK"}); +my %proc; my $force_unlock; my $git_dir; my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; +my $docker_bin = "/usr/bin/docker.io"; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, 'job-api-token=s' => \$job_api_token, 'no-clear-tmp' => \$no_clear_tmp, 'resume-stash=s' => \$resume_stash, + 'docker-bin=s' => \$docker_bin, ); if (defined $job_api_token) { $ENV{ARVADOS_API_TOKEN} = $job_api_token; } -my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; -my $local_job = 0; +my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST}; $SIG{'USR1'} = sub @@ -149,8 +152,6 @@ $SIG{'USR2'} = sub $main::ENV{CRUNCH_DEBUG} = 0; }; - - my $arv = Arvados->new('apiVersion' => 'v1'); my $Job; @@ -159,12 +160,41 @@ my $dbh; my $sth; my @jobstep; -my $User = api_call("users/current"); - +my $local_job; if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification $Job = api_call("jobs/get", uuid => $jobspec); + $local_job = 0; +} +else +{ + $Job = JSON::decode_json($jobspec); + $local_job = 1; +} + + +# Make sure our workers (our slurm nodes, localhost, or whatever) are +# at least able to run basic commands: they aren't down or severely +# misconfigured. +my $cmd = ['true']; +if ($Job->{docker_image_locator}) { + $cmd = [$docker_bin, 'ps', '-q']; +} +Log(undef, "Sanity check is `@$cmd`"); +srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], + $cmd, + {fork => 1}); +if ($? != 0) { + Log(undef, "Sanity check failed: ".exit_status_s($?)); + exit EX_TEMPFAIL; +} +Log(undef, "Sanity check OK"); + + +my $User = api_call("users/current"); + +if (!$local_job) { if (!$force_unlock) { # Claim this job, and make sure nobody else does eval { api_call("jobs/lock", uuid => $Job->{uuid}); }; @@ -176,8 +206,6 @@ if ($jobspec =~ /^[-a-z\d]+$/) } else { - $Job = JSON::decode_json($jobspec); - if (!$resume_stash) { map { croak ("No $_ specified") unless $Job->{$_} } @@ -315,8 +343,7 @@ my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; -my $squeue_checked; -my $squeue_kill_checked; +my $squeue_checked = 0; my $latest_refresh = scalar time; @@ -375,8 +402,7 @@ if (!defined $no_clear_tmp) { } # If this job requires a Docker image, install that. -my $docker_bin = "/usr/bin/docker.io"; -my ($docker_locator, $docker_stream, $docker_hash); +my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem); if ($docker_locator = $Job->{docker_image_locator}) { ($docker_stream, $docker_hash) = find_docker_image($docker_locator); if (!$docker_hash) @@ -408,6 +434,12 @@ fi .exit_status_s($?)); } + # Determine whether this version of Docker supports memory+swap limits. + srun(["srun", "--nodelist=" . $node[0]], + ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="], + {fork => 1}); + $docker_limitmem = ($? == 0); + if ($Job->{arvados_sdk_version}) { # The job also specifies an Arvados SDK version. Add the SDKs to the # tar file for the build script to install. @@ -583,32 +615,89 @@ if (!defined $git_archive) { } } else { - Log(undef, "Run install script on all workers"); - - my @srunargs = ("srun", - "--nodelist=$nodelist", - "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); - my @execargs = ("sh", "-c", - "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); + my $install_exited; + my $install_script_tries_left = 3; + for (my $attempts = 0; $attempts < 3; $attempts++) { + Log(undef, "Run install script on all workers"); + + my @srunargs = ("srun", + "--nodelist=$nodelist", + "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); + my @execargs = ("sh", "-c", + "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); + + $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); + my ($install_stderr_r, $install_stderr_w); + pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!"); + set_nonblocking($install_stderr_r); + my $installpid = fork(); + if ($installpid == 0) + { + close($install_stderr_r); + fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec + open(STDOUT, ">&", $install_stderr_w); + open(STDERR, ">&", $install_stderr_w); + srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); + exit (1); + } + close($install_stderr_w); + # Tell freeze_if_want_freeze how to kill the child, otherwise the + # "waitpid(installpid)" loop won't get interrupted by a freeze: + $proc{$installpid} = {}; + my $stderr_buf = ''; + # Track whether anything appears on stderr other than slurm errors + # ("srun: ...") and the "starting: ..." message printed by the + # srun subroutine itself: + my $stderr_anything_from_script = 0; + my $match_our_own_errors = '^(srun: error: |starting: \[)'; + while ($installpid != waitpid(-1, WNOHANG)) { + freeze_if_want_freeze ($installpid); + # Wait up to 0.1 seconds for something to appear on stderr, then + # do a non-blocking read. + my $bits = fhbits($install_stderr_r); + select ($bits, undef, $bits, 0.1); + if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf))) + { + while ($stderr_buf =~ /^(.*?)\n/) { + my $line = $1; + substr $stderr_buf, 0, 1+length($line), ""; + Log(undef, "stderr $line"); + if ($line !~ /$match_our_own_errors/) { + $stderr_anything_from_script = 1; + } + } + } + } + delete $proc{$installpid}; + $install_exited = $?; + close($install_stderr_r); + if (length($stderr_buf) > 0) { + if ($stderr_buf !~ /$match_our_own_errors/) { + $stderr_anything_from_script = 1; + } + Log(undef, "stderr $stderr_buf") + } - my $installpid = fork(); - if ($installpid == 0) - { - srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); - exit (1); - } - while (1) - { - last if $installpid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($installpid); - select (undef, undef, undef, 0.1); + Log (undef, "Install script exited ".exit_status_s($install_exited)); + last if $install_exited == 0 || $main::please_freeze; + # If the install script fails but doesn't print an error message, + # the next thing anyone is likely to do is just run it again in + # case it was a transient problem like "slurm communication fails + # because the network isn't reliable enough". So we'll just do + # that ourselves (up to 3 attempts in total). OTOH, if there is an + # error message, the problem is more likely to have a real fix and + # we should fail the job so the fixing process can start, instead + # of doing 2 more attempts. + last if $stderr_anything_from_script; } - my $install_exited = $?; - Log (undef, "Install script exited ".exit_status_s($install_exited)); + foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) { unlink($tar_filename); } - exit (1) if $install_exited != 0; + + if ($install_exited != 0) { + croak("Giving up"); + } } foreach (qw (script script_version script_parameters runtime_constraints)) @@ -633,16 +722,48 @@ ONELEVEL: my $thisround_succeeded = 0; my $thisround_failed = 0; my $thisround_failed_multiple = 0; +my $working_slot_count = scalar(@slot); @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; my $level = $jobstep[$jobstep_todo[0]]->{level}; -Log (undef, "start level $level"); +my $initial_tasks_this_level = 0; +foreach my $id (@jobstep_todo) { + $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level); +} +# If the number of tasks scheduled at this level #T is smaller than the number +# of slots available #S, only use the first #T slots, or the first slot on +# each node, whichever number is greater. +# +# When we dispatch tasks later, we'll allocate whole-node resources like RAM +# based on these numbers. Using fewer slots makes more resources available +# to each individual task, which should normally be a better strategy when +# there are fewer of them running with less parallelism. +# +# Note that this calculation is not redone if the initial tasks at +# this level queue more tasks at the same level. This may harm +# overall task throughput for that level. +my @freeslot; +if ($initial_tasks_this_level < @node) { + @freeslot = (0..$#node); +} elsif ($initial_tasks_this_level < @slot) { + @freeslot = (0..$initial_tasks_this_level - 1); +} else { + @freeslot = (0..$#slot); +} +my $round_num_freeslots = scalar(@freeslot); -my %proc; -my @freeslot = (0..$#slot); +my %round_max_slots = (); +for (my $ii = $#freeslot; $ii >= 0; $ii--) { + my $this_slot = $slot[$freeslot[$ii]]; + my $node_name = $this_slot->{node}->{name}; + $round_max_slots{$node_name} ||= $this_slot->{cpu}; + last if (scalar(keys(%round_max_slots)) >= @node); +} + +Log(undef, "start level $level with $round_num_freeslots slots"); my @holdslot; my %reader; my $progress_is_dirty = 1; @@ -651,7 +772,6 @@ my $progress_stats_updated = 0; update_progress_stats(); - THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { @@ -662,9 +782,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) next; } - pipe $reader{$id}, "writer" or croak ($!); - my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); - fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!); + pipe $reader{$id}, "writer" or croak("pipe() failed: $!"); + set_nonblocking($reader{$id}); my $childslot = $freeslot[0]; my $childnode = $slot[$childslot]->{node}; @@ -705,7 +824,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $ENV{"HOME"} = $ENV{"TASK_WORK"}; $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated - $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; + $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}}; $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"}; $ENV{"GZIP"} = "-n"; @@ -720,14 +839,25 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " ."&& cd $ENV{CRUNCH_TMP} " - ."&& MEM=\$(cat /proc/meminfo | grep MemTotal | sed 's/\\s\\s*/ /g' |cut -d' ' -f2) " - ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "; + # These environment variables get used explicitly later in + # $command. No tool is expected to read these values directly. + .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' @freeslot && $todo_ptr+1 > $#jobstep_todo)) + ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { last THISROUND if $main::please_freeze || defined($main::success); if ($main::please_info) @@ -858,6 +988,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { update_progress_stats(); } + $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 && + $_->{node}->{hold_count} < 4 } @slot); if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) { @@ -881,10 +1013,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } # give up if no nodes are succeeding - if (!grep { $_->{node}->{losing_streak} == 0 && - $_->{node}->{hold_count} < 4 } @slot) { - my $message = "Every node has failed -- giving up on this round"; - Log (undef, $message); + if ($working_slot_count < 1) { + Log(undef, "Every node has failed -- giving up"); last THISROUND; } } @@ -920,18 +1050,18 @@ freeze_if_want_freeze(); if (!defined $main::success) { - if (@jobstep_todo && - $thisround_succeeded == 0 && - ($thisround_failed == 0 || $thisround_failed > 4)) - { + if (!@jobstep_todo) { + $main::success = 1; + } elsif ($working_slot_count < 1) { + save_output_collection(); + save_meta(); + exit(EX_RETRY_UNLOCKED); + } elsif ($thisround_succeeded == 0 && + ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); $main::success = 0; } - if (!@jobstep_todo) - { - $main::success = 1; - } } goto ONELEVEL if !defined $main::success; @@ -939,16 +1069,7 @@ goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); -my $collated_output = &create_output_collection(); - -if (!$collated_output) { - Log (undef, "Failed to write output collection"); -} -else { - Log(undef, "job output $collated_output"); - $Job->update_attributes('output' => $collated_output); -} - +my $collated_output = save_output_collection(); Log (undef, "finish"); save_meta(); @@ -1035,7 +1156,7 @@ sub reapchildren Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds', ++$Jobstep->{'failures'}, - $temporary_fail ? 'temporary ' : 'permanent', + $temporary_fail ? 'temporary' : 'permanent', $elapsed)); if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { @@ -1125,29 +1246,45 @@ sub check_refresh_wanted sub check_squeue { - # return if the kill list was checked <4 seconds ago - if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4) - { - return; - } - $squeue_kill_checked = time; + my $last_squeue_check = $squeue_checked; - # use killem() on procs whose killtime is reached - for (keys %proc) + # Do not call `squeue` or check the kill list more than once every + # 15 seconds. + return if $last_squeue_check > time - 15; + $squeue_checked = time; + + # Look for children from which we haven't received stderr data since + # the last squeue check. If no such children exist, all procs are + # alive and there's no need to even look at squeue. + # + # As long as the crunchstat poll interval (10s) is shorter than the + # squeue check interval (15s) this should make the squeue check an + # infrequent event. + my $silent_procs = 0; + for my $jobstep (values %proc) { - if (exists $proc{$_}->{killtime} - && $proc{$_}->{killtime} <= time) + if ($jobstep->{stderr_at} < $last_squeue_check) { - killem ($_); + $silent_procs++; } } + return if $silent_procs == 0; - # return if the squeue was checked <60 seconds ago - if (defined $squeue_checked && $squeue_checked > time - 60) + # use killem() on procs whose killtime is reached + while (my ($pid, $jobstep) = each %proc) { - return; + if (exists $jobstep->{killtime} + && $jobstep->{killtime} <= time + && $jobstep->{stderr_at} < $last_squeue_check) + { + my $sincewhen = ""; + if ($jobstep->{stderr_at}) { + $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s"; + } + Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); + killem ($pid); + } } - $squeue_checked = time; if (!$have_slurm) { @@ -1155,37 +1292,46 @@ sub check_squeue return; } - # get a list of steps still running - my @squeue = `squeue -s -h -o '%i %j' && echo ok`; - chop @squeue; - if ($squeue[-1] ne "ok") + # Get a list of steps still running. Note: squeue(1) says --steps + # selects a format (which we override anyway) and allows us to + # specify which steps we're interested in (which we don't). + # Importantly, it also changes the meaning of %j from "job name" to + # "step name" and (although this isn't mentioned explicitly in the + # docs) switches from "one line per job" mode to "one line per step" + # mode. Without it, we'd just get a list of one job, instead of a + # list of N steps. + my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`; + if ($? != 0) { + Log(undef, "warning: squeue exit status $? ($!)"); return; } - pop @squeue; + chop @squeue; # which of my jobsteps are running, according to squeue? my %ok; - foreach (@squeue) + for my $jobstepname (@squeue) { - if (/^(\d+)\.(\d+) (\S+)/) - { - if ($1 eq $ENV{SLURM_JOBID}) - { - $ok{$3} = 1; - } - } + $ok{$jobstepname} = 1; } - # which of my active child procs (>60s old) were not mentioned by squeue? - foreach (keys %proc) + # Check for child procs >60s old and not mentioned by squeue. + while (my ($pid, $jobstep) = each %proc) { - if ($proc{$_}->{time} < time - 60 - && !exists $ok{$proc{$_}->{jobstepname}} - && !exists $proc{$_}->{killtime}) + if ($jobstep->{time} < time - 60 + && $jobstep->{jobstepname} + && !exists $ok{$jobstep->{jobstepname}} + && !exists $jobstep->{killtime}) { - # kill this proc if it hasn't exited in 30 seconds - $proc{$_}->{killtime} = time + 30; + # According to slurm, this task has ended (successfully or not) + # -- but our srun child hasn't exited. First we must wait (30 + # seconds) in case this is just a race between communication + # channels. Then, if our srun child process still hasn't + # terminated, we'll conclude some slurm communication + # error/delay has caused the task to die without notifying srun, + # and we'll kill srun ourselves. + $jobstep->{killtime} = time + 30; + Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited"); } } } @@ -1196,7 +1342,7 @@ sub release_allocation if ($have_slurm) { Log (undef, "release job allocation"); - system "scancel $ENV{SLURM_JOBID}"; + system "scancel $ENV{SLURM_JOB_ID}"; } } @@ -1210,6 +1356,7 @@ sub readfrompipes while (0 < sysread ($reader{$job}, $buf, 8192)) { print STDERR $buf if $ENV{CRUNCH_DEBUG}; + $jobstep[$job]->{stderr_at} = time; $jobstep[$job]->{stderr} .= $buf; preprocess_stderr ($job); if (length ($jobstep[$job]->{stderr}) > 16384) @@ -1357,6 +1504,20 @@ print (arvados.api("v1").collections(). return $joboutput; } +# Calls create_output_collection, logs the result, and returns it. +# If that was successful, save that as the output in the job record. +sub save_output_collection { + my $collated_output = create_output_collection(); + + if (!$collated_output) { + Log(undef, "Failed to write output collection"); + } + else { + Log(undef, "job output $collated_output"); + $Job->update_attributes('output' => $collated_output); + } + return $collated_output; +} sub killem { @@ -1402,6 +1563,8 @@ sub fhbits # Send log output to Keep via arv-put. # # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. +# $log_pipe_out_buf is a string containing all output read from arv-put so far. +# $log_pipe_out_select is an IO::Select object around $log_pipe_out. # $log_pipe_pid is the pid of the arv-put subprocess. # # The only functions that should access these variables directly are: @@ -1410,6 +1573,13 @@ sub fhbits # Starts an arv-put pipe, reading data on stdin and writing it to # a $logfilename file in an output collection. # +# log_writer_read_output([$timeout]) +# Read output from $log_pipe_out and append it to $log_pipe_out_buf. +# Passes $timeout to the select() call, with a default of 0.01. +# Returns the result of the last read() call on $log_pipe_out, or +# -1 if read() wasn't called because select() timed out. +# Only other log_writer_* functions should need to call this. +# # log_writer_send($txt) # Writes $txt to the output log collection. # @@ -1420,25 +1590,40 @@ sub fhbits # Returns a true value if there is currently a live arv-put # process, false otherwise. # -my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); +my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select, + $log_pipe_pid); sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', - '--portable-data-hash', - '--project-uuid', $Job->{owner_uuid}, + '--stream', '--retries', '3', - '--name', $logfilename, '--filename', $logfilename, '-'); + $log_pipe_out_buf = ""; + $log_pipe_out_select = IO::Select->new($log_pipe_out); +} + +sub log_writer_read_output { + my $timeout = shift || 0.01; + my $read = -1; + while ($read && $log_pipe_out_select->can_read($timeout)) { + $read = read($log_pipe_out, $log_pipe_out_buf, 65536, + length($log_pipe_out_buf)); + } + if (!defined($read)) { + Log(undef, "error reading log manifest from arv-put: $!"); + } + return $read; } sub log_writer_send($) { my $txt = shift; print $log_pipe_in $txt; + log_writer_read_output(); } sub log_writer_finish() @@ -1446,22 +1631,24 @@ sub log_writer_finish() return unless $log_pipe_pid; close($log_pipe_in); - my $arv_put_output; - my $s = IO::Select->new($log_pipe_out); - if ($s->can_read(120)) { - sysread($log_pipe_out, $arv_put_output, 1024); - chomp($arv_put_output); - } else { + my $read_result = log_writer_read_output(120); + if ($read_result == -1) { Log (undef, "timed out reading from 'arv-put'"); + } elsif ($read_result != 0) { + Log(undef, "failed to read arv-put log manifest to EOF"); } waitpid($log_pipe_pid, 0); - $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; if ($?) { - Log("log_writer_finish: arv-put exited ".exit_status_s($?)) + Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?)) } + close($log_pipe_out); + my $arv_put_output = $log_pipe_out_buf; + $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf = + $log_pipe_out_select = undef; + return $arv_put_output; } @@ -1525,10 +1712,21 @@ sub save_meta return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm return unless log_writer_is_active(); - my $loglocator = log_writer_finish(); - Log (undef, "log manifest is $loglocator"); - $Job->{'log'} = $loglocator; - $Job->update_attributes('log', $loglocator); + my $log_manifest = ""; + if ($Job->{log}) { + my $prev_log_coll = api_call("collections/get", uuid => $Job->{log}); + $log_manifest .= $prev_log_coll->{manifest_text}; + } + $log_manifest .= log_writer_finish(); + + my $log_coll = api_call( + "collections/create", ensure_unique_name => 1, collection => { + manifest_text => $log_manifest, + owner_uuid => $Job->{owner_uuid}, + name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}), + }); + Log(undef, "log collection is " . $log_coll->{portable_data_hash}); + $Job->update_attributes('log' => $log_coll->{portable_data_hash}); } @@ -1604,7 +1802,13 @@ sub srun my $show_cmd = Dumper($args); $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g; $show_cmd =~ s/\n/ /g; - warn "starting: $show_cmd\n"; + if ($opts->{fork}) { + Log(undef, "starting: $show_cmd"); + } else { + # This is a child process: parent is in charge of reading our + # stderr and copying it to Log() if needed. + warn "starting: $show_cmd\n"; + } if (defined $stdin) { my $child = open STDIN, "-|"; @@ -1813,6 +2017,12 @@ sub combined_git_archive { return $tar_contents; } +sub set_nonblocking { + my $fh = shift; + my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!); + fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!); +} + __DATA__ #!/usr/bin/perl # @@ -1840,12 +2050,15 @@ use constant TASK_TEMPFAIL => 111; my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB"); my $destdir = $ENV{"CRUNCH_SRC"}; -my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; +my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"}; my $repo = $ENV{"CRUNCH_SRC_URL"}; my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt"); my $job_work = $ENV{"JOB_WORK"}; my $task_work = $ENV{"TASK_WORK"}; +open(STDOUT_ORIG, ">&", STDOUT); +open(STDERR_ORIG, ">&", STDERR); + for my $dir ($destdir, $job_work, $task_work) { if ($dir) { make_path $dir; @@ -1857,11 +2070,6 @@ if ($task_work) { remove_tree($task_work, {keep_root => 1}); } -open(STDOUT_ORIG, ">&", STDOUT); -open(STDERR_ORIG, ">&", STDERR); -open(STDOUT, ">>", "$destdir.log"); -open(STDERR, ">&", STDOUT); - ### Crunch script run mode if (@ARGV) { # We want to do routine logging during task 0 only. This gives the user @@ -1922,10 +2130,6 @@ if (@ARGV) { } } - close(STDOUT); - close(STDERR); - open(STDOUT, ">&", STDOUT_ORIG); - open(STDERR, ">&", STDERR_ORIG); exec(@ARGV); die "Cannot exec `@ARGV`: $!"; } @@ -1933,26 +2137,43 @@ if (@ARGV) { ### Installation mode open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; -if (readlink ("$destdir.commit") eq $commit && -d $destdir) { - # This version already installed -> nothing to do. +if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) { + # This exact git archive (source + arvados sdk) is already installed + # here, so there's no need to reinstall it. + + # We must consume our DATA section, though: otherwise the process + # feeding it to us will get SIGPIPE. + my $buf; + while (read(DATA, $buf, 65536)) { } + exit(0); } -unlink "$destdir.commit"; +unlink "$destdir.archive_hash"; mkdir $destdir; -if (!open(TARX, "|-", "tar", "-xC", $destdir)) { - die "Error launching 'tar -xC $destdir': $!"; -} -# If we send too much data to tar in one write (> 4-5 MiB), it stops, and we -# get SIGPIPE. We must feed it data incrementally. -my $tar_input; -while (read(DATA, $tar_input, 65536)) { - print TARX $tar_input; -} -if(!close(TARX)) { - die "'tar -xC $destdir' exited $?: $!"; -} +do { + # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1). + local $SIG{PIPE} = "IGNORE"; + warn "Extracting archive: $archive_hash\n"; + # --ignore-zeros is necessary sometimes: depending on how much NUL + # padding tar -A put on our combined archive (which in turn depends + # on the length of the component archives) tar without + # --ignore-zeros will exit before consuming stdin and cause close() + # to fail on the resulting SIGPIPE. + if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) { + die "Error launching 'tar -xC $destdir': $!"; + } + # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we + # get SIGPIPE. We must feed it data incrementally. + my $tar_input; + while (read(DATA, $tar_input, 65536)) { + print TARX $tar_input; + } + if(!close(TARX)) { + die "'tar -xC $destdir' exited $?: $!"; + } +}; mkdir $install_dir; @@ -1969,15 +2190,34 @@ if (-d $sdk_root) { } my $python_dir = "$install_dir/python"; -if ((-d $python_dir) and can_run("python2.7") and - (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) { - # egg_info failed, probably when it asked git for a build tag. - # Specify no build tag. - open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); - print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; - close($pysdk_cfg); +if ((-d $python_dir) and can_run("python2.7")) { + open(my $egg_info_pipe, "-|", + "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null"); + my @egg_info_errors = <$egg_info_pipe>; + close($egg_info_pipe); + if ($?) { + if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) { + # egg_info apparently failed because it couldn't ask git for a build tag. + # Specify no build tag. + open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); + print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; + close($pysdk_cfg); + } else { + my $egg_info_exit = $? >> 8; + foreach my $errline (@egg_info_errors) { + print STDERR_ORIG $errline; + } + warn "python setup.py egg_info failed: exit $egg_info_exit"; + exit ($egg_info_exit || 1); + } + } } +# Hide messages from the install script (unless it fails: shell_or_die +# will show $destdir.log in that case). +open(STDOUT, ">>", "$destdir.log"); +open(STDERR, ">&", STDOUT); + if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir); } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") { @@ -1987,10 +2227,10 @@ if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "./install.sh", $install_dir); } -if ($commit) { - unlink "$destdir.commit.new"; - symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!"; - rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!"; +if ($archive_hash) { + unlink "$destdir.archive_hash.new"; + symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!"; + rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!"; } close L;