X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1da9a2a61d66601ab9a02bff439d610ee19c5932..e359c70eb7adb66df7c6aae6edb738e5f543d6e4:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index c536da66a7..2246c86fb6 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree ); use constant TASK_TEMPFAIL => 111; use constant EX_TEMPFAIL => 75; +use constant EX_RETRY_UNLOCKED => 93; $ENV{"TMPDIR"} ||= "/tmp"; unless (defined $ENV{"CRUNCH_TMP"}) { @@ -139,7 +140,7 @@ if (defined $job_api_token) { $ENV{ARVADOS_API_TOKEN} = $job_api_token; } -my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; +my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST}; $SIG{'USR1'} = sub @@ -721,6 +722,7 @@ ONELEVEL: my $thisround_succeeded = 0; my $thisround_failed = 0; my $thisround_failed_multiple = 0; +my $working_slot_count = scalar(@slot); @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; @@ -986,6 +988,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { update_progress_stats(); } + $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 && + $_->{node}->{hold_count} < 4 } @slot); if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) { @@ -1009,10 +1013,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } # give up if no nodes are succeeding - if (!grep { $_->{node}->{losing_streak} == 0 && - $_->{node}->{hold_count} < 4 } @slot) { - my $message = "Every node has failed -- giving up on this round"; - Log (undef, $message); + if ($working_slot_count < 1) { + Log(undef, "Every node has failed -- giving up"); last THISROUND; } } @@ -1048,18 +1050,18 @@ freeze_if_want_freeze(); if (!defined $main::success) { - if (@jobstep_todo && - $thisround_succeeded == 0 && - ($thisround_failed == 0 || $thisround_failed > 4)) - { + if (!@jobstep_todo) { + $main::success = 1; + } elsif ($working_slot_count < 1) { + save_output_collection(); + save_meta(); + exit(EX_RETRY_UNLOCKED); + } elsif ($thisround_succeeded == 0 && + ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); $main::success = 0; } - if (!@jobstep_todo) - { - $main::success = 1; - } } goto ONELEVEL if !defined $main::success; @@ -1067,16 +1069,7 @@ goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); -my $collated_output = &create_output_collection(); - -if (!$collated_output) { - Log (undef, "Failed to write output collection"); -} -else { - Log(undef, "job output $collated_output"); - $Job->update_attributes('output' => $collated_output); -} - +my $collated_output = save_output_collection(); Log (undef, "finish"); save_meta(); @@ -1284,7 +1277,11 @@ sub check_squeue && $jobstep->{killtime} <= time && $jobstep->{stderr_at} < $last_squeue_check) { - Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task disappeared from slurm queue)"); + my $sincewhen = ""; + if ($jobstep->{stderr_at}) { + $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s"; + } + Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); killem ($pid); } } @@ -1295,8 +1292,15 @@ sub check_squeue return; } - # get a list of steps still running - my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOBID}\E --steps --format='%i %j' --noheader`; + # Get a list of steps still running. Note: squeue(1) says --steps + # selects a format (which we override anyway) and allows us to + # specify which steps we're interested in (which we don't). + # Importantly, it also changes the meaning of %j from "job name" to + # "step name" and (although this isn't mentioned explicitly in the + # docs) switches from "one line per job" mode to "one line per step" + # mode. Without it, we'd just get a list of one job, instead of a + # list of N steps. + my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`; if ($? != 0) { Log(undef, "warning: squeue exit status $? ($!)"); @@ -1306,15 +1310,9 @@ sub check_squeue # which of my jobsteps are running, according to squeue? my %ok; - foreach (@squeue) + for my $jobstepname (@squeue) { - if (/^(\d+)\.(\d+) (\S+)/) - { - if ($1 eq $ENV{SLURM_JOBID}) - { - $ok{$3} = 1; - } - } + $ok{$jobstepname} = 1; } # Check for child procs >60s old and not mentioned by squeue. @@ -1344,7 +1342,7 @@ sub release_allocation if ($have_slurm) { Log (undef, "release job allocation"); - system "scancel $ENV{SLURM_JOBID}"; + system "scancel $ENV{SLURM_JOB_ID}"; } } @@ -1506,6 +1504,20 @@ print (arvados.api("v1").collections(). return $joboutput; } +# Calls create_output_collection, logs the result, and returns it. +# If that was successful, save that as the output in the job record. +sub save_output_collection { + my $collated_output = create_output_collection(); + + if (!$collated_output) { + Log(undef, "Failed to write output collection"); + } + else { + Log(undef, "job output $collated_output"); + $Job->update_attributes('output' => $collated_output); + } + return $collated_output; +} sub killem { @@ -1551,6 +1563,8 @@ sub fhbits # Send log output to Keep via arv-put. # # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. +# $log_pipe_out_buf is a string containing all output read from arv-put so far. +# $log_pipe_out_select is an IO::Select object around $log_pipe_out. # $log_pipe_pid is the pid of the arv-put subprocess. # # The only functions that should access these variables directly are: @@ -1559,6 +1573,13 @@ sub fhbits # Starts an arv-put pipe, reading data on stdin and writing it to # a $logfilename file in an output collection. # +# log_writer_read_output([$timeout]) +# Read output from $log_pipe_out and append it to $log_pipe_out_buf. +# Passes $timeout to the select() call, with a default of 0.01. +# Returns the result of the last read() call on $log_pipe_out, or +# -1 if read() wasn't called because select() timed out. +# Only other log_writer_* functions should need to call this. +# # log_writer_send($txt) # Writes $txt to the output log collection. # @@ -1569,25 +1590,40 @@ sub fhbits # Returns a true value if there is currently a live arv-put # process, false otherwise. # -my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); +my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select, + $log_pipe_pid); sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', - '--portable-data-hash', - '--project-uuid', $Job->{owner_uuid}, + '--stream', '--retries', '3', - '--name', $logfilename, '--filename', $logfilename, '-'); + $log_pipe_out_buf = ""; + $log_pipe_out_select = IO::Select->new($log_pipe_out); +} + +sub log_writer_read_output { + my $timeout = shift || 0.01; + my $read = -1; + while ($read && $log_pipe_out_select->can_read($timeout)) { + $read = read($log_pipe_out, $log_pipe_out_buf, 65536, + length($log_pipe_out_buf)); + } + if (!defined($read)) { + Log(undef, "error reading log manifest from arv-put: $!"); + } + return $read; } sub log_writer_send($) { my $txt = shift; print $log_pipe_in $txt; + log_writer_read_output(); } sub log_writer_finish() @@ -1595,22 +1631,24 @@ sub log_writer_finish() return unless $log_pipe_pid; close($log_pipe_in); - my $arv_put_output; - my $s = IO::Select->new($log_pipe_out); - if ($s->can_read(120)) { - sysread($log_pipe_out, $arv_put_output, 1024); - chomp($arv_put_output); - } else { + my $read_result = log_writer_read_output(120); + if ($read_result == -1) { Log (undef, "timed out reading from 'arv-put'"); + } elsif ($read_result != 0) { + Log(undef, "failed to read arv-put log manifest to EOF"); } waitpid($log_pipe_pid, 0); - $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; if ($?) { - Log("log_writer_finish: arv-put exited ".exit_status_s($?)) + Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?)) } + close($log_pipe_out); + my $arv_put_output = $log_pipe_out_buf; + $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf = + $log_pipe_out_select = undef; + return $arv_put_output; } @@ -1674,10 +1712,21 @@ sub save_meta return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm return unless log_writer_is_active(); - my $loglocator = log_writer_finish(); - Log (undef, "log manifest is $loglocator"); - $Job->{'log'} = $loglocator; - $Job->update_attributes('log', $loglocator); + my $log_manifest = ""; + if ($Job->{log}) { + my $prev_log_coll = api_call("collections/get", uuid => $Job->{log}); + $log_manifest .= $prev_log_coll->{manifest_text}; + } + $log_manifest .= log_writer_finish(); + + my $log_coll = api_call( + "collections/create", ensure_unique_name => 1, collection => { + manifest_text => $log_manifest, + owner_uuid => $Job->{owner_uuid}, + name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}), + }); + Log(undef, "log collection is " . $log_coll->{portable_data_hash}); + $Job->update_attributes('log' => $log_coll->{portable_data_hash}); } @@ -2141,13 +2190,27 @@ if (-d $sdk_root) { } my $python_dir = "$install_dir/python"; -if ((-d $python_dir) and can_run("python2.7") and - (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) { - # egg_info failed, probably when it asked git for a build tag. - # Specify no build tag. - open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); - print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; - close($pysdk_cfg); +if ((-d $python_dir) and can_run("python2.7")) { + open(my $egg_info_pipe, "-|", + "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null"); + my @egg_info_errors = <$egg_info_pipe>; + close($egg_info_pipe); + if ($?) { + if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) { + # egg_info apparently failed because it couldn't ask git for a build tag. + # Specify no build tag. + open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); + print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; + close($pysdk_cfg); + } else { + my $egg_info_exit = $? >> 8; + foreach my $errline (@egg_info_errors) { + print STDERR_ORIG $errline; + } + warn "python setup.py egg_info failed: exit $egg_info_exit"; + exit ($egg_info_exit || 1); + } + } } # Hide messages from the install script (unless it fails: shell_or_die