X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1da127f1e63485ac225cb16511013094fd7e84f6..41190e074c3c4504b7627757250a1b188b86b54e:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 70f379e53f..f20cb733eb 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -141,7 +141,6 @@ $SIG{'USR2'} = sub my $arv = Arvados->new('apiVersion' => 'v1'); -my $local_logfile; my $User = $arv->{'users'}->{'current'}->execute; @@ -161,6 +160,10 @@ if ($job_has_uuid) Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'}); exit EX_TEMPFAIL; } + if ($Job->{'state'} ne 'Queued') { + Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs."); + exit EX_TEMPFAIL; + } if ($Job->{'success'} ne undef) { Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null"); exit EX_TEMPFAIL; @@ -195,7 +198,7 @@ else $job_id = $Job->{'uuid'}; my $keep_logfile = $job_id . '.log.txt'; -$local_logfile = File::Temp->new(); +log_writer_start($keep_logfile); $Job->{'runtime_constraints'} ||= {}; $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0; @@ -287,9 +290,7 @@ if ($job_has_uuid) Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; } - $Job->update_attributes('started_at' => scalar gmtime, - 'running' => 1, - 'success' => undef, + $Job->update_attributes('state' => 'Running', 'tasks_summary' => { 'failed' => 0, 'todo' => 1, 'running' => 0, @@ -876,12 +877,14 @@ Log (undef, "finish"); save_meta(); if ($job_has_uuid) { - $Job->update_attributes('running' => 0, - 'success' => $collated_output && $main::success, - 'finished_at' => scalar gmtime) + if ($collated_output && $main::success) { + $Job->update_attributes('state' => 'Complete') + } else { + $Job->update_attributes('state' => 'Failed') + } } -exit ($Job->{'success'} ? 1 : 0); +exit ($Job->{'state'} != 'Complete' ? 1 : 0); @@ -1033,12 +1036,16 @@ sub check_refresh_wanted my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', - 'cancelled_by_client_uuid') { + 'cancelled_by_client_uuid', + 'state') { $Job->{$attr} = $Job2->{$attr}; } - if ($Job->{'cancelled_at'}) { - Log (undef, "Job cancelled at " . $Job->{cancelled_at} . - " by user " . $Job->{cancelled_by_user_uuid}); + if ($Job->{'state'} ne "Running") { + if ($Job->{'state'} eq "Cancelled") { + Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'}); + } else { + Log (undef, "Job state unexpectedly changed to " . $Job->{'state'}); + } $main::success = 0; $main::please_freeze = 1; } @@ -1294,6 +1301,73 @@ sub fhbits } +# Send log output to Keep via arv-put. +# +# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. +# $log_pipe_pid is the pid of the arv-put subprocess. +# +# The only functions that should access these variables directly are: +# +# log_writer_start($logfilename) +# Starts an arv-put pipe, reading data on stdin and writing it to +# a $logfilename file in an output collection. +# +# log_writer_send($txt) +# Writes $txt to the output log collection. +# +# log_writer_finish() +# Closes the arv-put pipe and returns the output that it produces. +# +# log_writer_is_active() +# Returns a true value if there is currently a live arv-put +# process, false otherwise. +# +my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); + +sub log_writer_start($) +{ + my $logfilename = shift; + $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, + 'arv-put', '--portable-data-hash', + '--retries', '3', + '--filename', $logfilename, + '-'); +} + +sub log_writer_send($) +{ + my $txt = shift; + print $log_pipe_in $txt; +} + +sub log_writer_finish() +{ + return unless $log_pipe_pid; + + close($log_pipe_in); + my $arv_put_output; + + my $s = IO::Select->new($log_pipe_out); + if ($s->can_read(120)) { + sysread($log_pipe_out, $arv_put_output, 1024); + chomp($arv_put_output); + } else { + Log (undef, "timed out reading from 'arv-put'"); + } + + waitpid($log_pipe_pid, 0); + $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; + if ($?) { + Log("log_writer_finish: arv-put returned error $?") + } + + return $arv_put_output; +} + +sub log_writer_is_active() { + return $log_pipe_pid; +} + sub Log # ($jobstep_id, $logmessage) { if ($_[1] =~ /\n/) { @@ -1307,15 +1381,15 @@ sub Log # ($jobstep_id, $logmessage) $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; - if ($local_logfile || -t STDERR) { + if (log_writer_is_active() || -t STDERR) { my @gmtime = gmtime; $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); } print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); - if ($local_logfile) { - print $local_logfile $datetime . " " . $message; + if (log_writer_is_active()) { + log_writer_send($datetime . " " . $message); } } @@ -1328,7 +1402,7 @@ sub croak freeze() if @jobstep_todo; collate_output() if @jobstep_todo; cleanup(); - save_meta() if $local_logfile; + save_meta() if log_writer_is_active(); die; } @@ -1336,9 +1410,11 @@ sub croak sub cleanup { return if !$job_has_uuid; - $Job->update_attributes('running' => 0, - 'success' => 0, - 'finished_at' => scalar gmtime); + if ($Job->{'state'} eq 'Cancelled') { + $Job->update_attributes('finished_at' => scalar gmtime); + } else { + $Job->update_attributes('state' => 'Failed'); + } } @@ -1347,15 +1423,7 @@ sub save_meta my $justcheckpoint = shift; # false if this will be the last meta saved return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm - $local_logfile->flush; - my $retry_count = put_retry_count(); - my $cmd = "arv-put --portable-data-hash --retries $retry_count " . - "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename); - my $loglocator = `$cmd`; - die "system $cmd failed: $?" if $?; - chomp($loglocator); - - $local_logfile = undef; # the temp file is automatically deleted + my $loglocator = log_writer_finish(); Log (undef, "log manifest is $loglocator"); $Job->{'log'} = $loglocator; $Job->update_attributes('log', $loglocator) if $job_has_uuid;