my $arv = Arvados->new('apiVersion' => 'v1');
-my $local_logfile;
my $User = $arv->{'users'}->{'current'}->execute;
Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
exit EX_TEMPFAIL;
}
+ if ($Job->{'state'} ne 'Queued') {
+ Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
+ exit EX_TEMPFAIL;
+ }
if ($Job->{'success'} ne undef) {
Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
exit EX_TEMPFAIL;
$job_id = $Job->{'uuid'};
my $keep_logfile = $job_id . '.log.txt';
-$local_logfile = File::Temp->new();
+log_writer_start($keep_logfile);
$Job->{'runtime_constraints'} ||= {};
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
exit EX_TEMPFAIL;
}
- $Job->update_attributes('started_at' => scalar gmtime,
- 'running' => 1,
- 'success' => undef,
+ $Job->update_attributes('state' => 'Running',
'tasks_summary' => { 'failed' => 0,
'todo' => 1,
'running' => 0,
save_meta();
if ($job_has_uuid) {
- $Job->update_attributes('running' => 0,
- 'success' => $collated_output && $main::success,
- 'finished_at' => scalar gmtime)
+ if ($collated_output && $main::success) {
+ $Job->update_attributes('state' => 'Complete')
+ } else {
+ $Job->update_attributes('state' => 'Failed')
+ }
}
-exit ($Job->{'success'} ? 1 : 0);
+exit ($Job->{'state'} != 'Complete' ? 1 : 0);
my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
for my $attr ('cancelled_at',
'cancelled_by_user_uuid',
- 'cancelled_by_client_uuid') {
+ 'cancelled_by_client_uuid',
+ 'state') {
$Job->{$attr} = $Job2->{$attr};
}
- if ($Job->{'cancelled_at'}) {
- Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
- " by user " . $Job->{cancelled_by_user_uuid});
+ if ($Job->{'state'} ne "Running") {
+ if ($Job->{'state'} eq "Cancelled") {
+ Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
+ } else {
+ Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
+ }
$main::success = 0;
$main::please_freeze = 1;
}
}
+# Send log output to Keep via arv-put.
+#
+# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_pid is the pid of the arv-put subprocess.
+#
+# The only functions that should access these variables directly are:
+#
+# log_writer_start($logfilename)
+# Starts an arv-put pipe, reading data on stdin and writing it to
+# a $logfilename file in an output collection.
+#
+# log_writer_send($txt)
+# Writes $txt to the output log collection.
+#
+# log_writer_finish()
+# Closes the arv-put pipe and returns the output that it produces.
+#
+# log_writer_is_active()
+# Returns a true value if there is currently a live arv-put
+# process, false otherwise.
+#
+my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+
+sub log_writer_start($)
+{
+ my $logfilename = shift;
+ $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
+ 'arv-put', '--portable-data-hash',
+ '--retries', '3',
+ '--filename', $logfilename,
+ '-');
+}
+
+sub log_writer_send($)
+{
+ my $txt = shift;
+ print $log_pipe_in $txt;
+}
+
+sub log_writer_finish()
+{
+ return unless $log_pipe_pid;
+
+ close($log_pipe_in);
+ my $arv_put_output;
+
+ my $s = IO::Select->new($log_pipe_out);
+ if ($s->can_read(120)) {
+ sysread($log_pipe_out, $arv_put_output, 1024);
+ chomp($arv_put_output);
+ } else {
+ Log (undef, "timed out reading from 'arv-put'");
+ }
+
+ waitpid($log_pipe_pid, 0);
+ $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
+ if ($?) {
+ Log("log_writer_finish: arv-put returned error $?")
+ }
+
+ return $arv_put_output;
+}
+
+sub log_writer_is_active() {
+ return $log_pipe_pid;
+}
+
sub Log # ($jobstep_id, $logmessage)
{
if ($_[1] =~ /\n/) {
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
- if ($local_logfile || -t STDERR) {
+ if (log_writer_is_active() || -t STDERR) {
my @gmtime = gmtime;
$datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
$gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
}
print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
- if ($local_logfile) {
- print $local_logfile $datetime . " " . $message;
+ if (log_writer_is_active()) {
+ log_writer_send($datetime . " " . $message);
}
}
freeze() if @jobstep_todo;
collate_output() if @jobstep_todo;
cleanup();
- save_meta() if $local_logfile;
+ save_meta() if log_writer_is_active();
die;
}
sub cleanup
{
return if !$job_has_uuid;
- $Job->update_attributes('running' => 0,
- 'success' => 0,
- 'finished_at' => scalar gmtime);
+ if ($Job->{'state'} eq 'Cancelled') {
+ $Job->update_attributes('finished_at' => scalar gmtime);
+ } else {
+ $Job->update_attributes('state' => 'Failed');
+ }
}
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
- $local_logfile->flush;
- my $retry_count = put_retry_count();
- my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
- "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
- my $loglocator = `$cmd`;
- die "system $cmd failed: $?" if $?;
- chomp($loglocator);
-
- $local_logfile = undef; # the temp file is automatically deleted
+ my $loglocator = log_writer_finish();
Log (undef, "log manifest is $loglocator");
$Job->{'log'} = $loglocator;
$Job->update_attributes('log', $loglocator) if $job_has_uuid;