Obtain job details from Arvados, run tasks on compute nodes (typically
invoked by scheduler on controller):
- crunch-job --job x-y-z
+ crunch-job --job x-y-z --git-dir /path/to/repo/.git
Obtain job details from command line, run tasks on local machine
(typically invoked by application or developer on VM):
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+ crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
+
+ crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
=head1 OPTIONS
=item --git-dir
-Path to .git directory where the specified commit is found.
+Path to a .git directory (or a git URL) where the commit given in the
+job's C<script_version> attribute is to be found. If this is I<not>
+given, the job's C<repository> attribute will be used.
=item --job-api-token
setup. This can speed up development and debugging when running jobs
locally.
+=item --job
+
+UUID of the job to run, or a JSON-encoded job resource without a
+UUID. If the latter is given, a new job object will be created.
+
=back
=head1 RUNNING JOBS LOCALLY
my $arv = Arvados->new('apiVersion' => 'v1');
-my $local_logfile;
my $User = $arv->{'users'}->{'current'}->execute;
# $jobspec is an Arvados UUID, not a JSON job specification
$Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
if (!$force_unlock) {
- # If some other crunch-job process has grabbed this job (or we see
- # other evidence that the job is already underway) we exit
- # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
- # mark the job as failed.
- if ($Job->{'is_locked_by_uuid'}) {
- Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
- exit EX_TEMPFAIL;
- }
- if ($Job->{'state'} ne 'Queued') {
- Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
- exit EX_TEMPFAIL;
- }
- if ($Job->{'success'} ne undef) {
- Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
- exit EX_TEMPFAIL;
- }
- if ($Job->{'running'}) {
- Log(undef, "Job 'running' flag is already set");
+ # Claim this job, and make sure nobody else does
+ eval {
+ # lock() sets is_locked_by_uuid and changes state to Running.
+ $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
+ };
+ if ($@) {
+ Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
exit EX_TEMPFAIL;
- }
- if ($Job->{'started_at'}) {
- Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
- exit EX_TEMPFAIL;
- }
+ };
}
}
else
$Job->{'is_locked_by_uuid'} = $User->{'uuid'};
$Job->{'started_at'} = gmtime;
+ $Job->{'state'} = 'Running';
$Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
}
$job_id = $Job->{'uuid'};
my $keep_logfile = $job_id . '.log.txt';
-$local_logfile = File::Temp->new();
+log_writer_start($keep_logfile);
$Job->{'runtime_constraints'} ||= {};
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
@slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
-
-my $jobmanager_id;
-# Claim this job, and make sure nobody else does
-unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
- $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
- Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
- exit EX_TEMPFAIL;
-}
-$Job->update_attributes('state' => 'Running',
- 'tasks_summary' => { 'failed' => 0,
- 'todo' => 1,
- 'running' => 0,
- 'done' => 0 });
-
+$Job->update_attributes(
+ 'tasks_summary' => { 'failed' => 0,
+ 'todo' => 1,
+ 'running' => 0,
+ 'done' => 0 });
Log (undef, "start");
$SIG{'INT'} = sub { $main::please_freeze = 1; };
my $git_archive;
if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
- # If we're in user-land (i.e., not called from crunch-dispatch)
- # script_version can be an absolute directory path, signifying we
- # should work straight out of that directory instead of using a git
- # commit.
+ # If script_version looks like an absolute path, *and* the --git-dir
+ # argument was not given -- which implies we were not invoked by
+ # crunch-dispatch -- we will use the given path as a working
+ # directory instead of resolving script_version to a git commit (or
+ # doing anything else with git).
$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
$ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
}
# repository.
my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
'filters' => [['name','=',$repo]]
- )->{'items'};
- my $n_found = scalar @{$arv_repo_list};
+ );
+ my @repos_found = @{$arv_repo_list->{'items'}};
+ my $n_found = $arv_repo_list->{'items_available'};
if ($n_found > 0) {
Log(undef, "Repository '$repo' -> "
- . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
+ . join(", ", map { $_->{'uuid'} } @repos_found));
}
if ($n_found != 1) {
croak("Error: Found $n_found repositories with name '$repo'.");
}
- $repo = $arv_repo_list->[0]->{'fetch_url'};
+ $repo = $repos_found[0]->{'fetch_url'};
$repo_location = 'remote';
}
Log(undef, "Using $repo_location repository '$repo'");
# our local cache first, since that's cheaper. (We don't want to
# do that with tags/branches though -- those change over time, so
# they should always be resolved by the remote repo.)
- if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
+ if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
# Hide stderr because it's normal for this to fail:
my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
if ($? == 0 &&
- $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
+ # Careful not to resolve a branch named abcdeff to commit 1234567:
+ $sha1 =~ /^$treeish/ &&
$sha1 =~ /^([0-9a-f]{40})$/s) {
$commit = $1;
Log(undef, "Commit $commit already present in $local_repo");
}
+# Send log output to Keep via arv-put.
+#
+# $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_pid is the pid of the arv-put subprocess.
+#
+# The only functions that should access these variables directly are:
+#
+# log_writer_start($logfilename)
+# Starts an arv-put pipe, reading data on stdin and writing it to
+# a $logfilename file in an output collection.
+#
+# log_writer_send($txt)
+# Writes $txt to the output log collection.
+#
+# log_writer_finish()
+# Closes the arv-put pipe and returns the output that it produces.
+#
+# log_writer_is_active()
+# Returns a true value if there is currently a live arv-put
+# process, false otherwise.
+#
+my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+
+sub log_writer_start($)
+{
+ my $logfilename = shift;
+ $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
+ 'arv-put', '--portable-data-hash',
+ '--retries', '3',
+ '--filename', $logfilename,
+ '-');
+}
+
+sub log_writer_send($)
+{
+ my $txt = shift;
+ print $log_pipe_in $txt;
+}
+
+sub log_writer_finish()
+{
+ return unless $log_pipe_pid;
+
+ close($log_pipe_in);
+ my $arv_put_output;
+
+ my $s = IO::Select->new($log_pipe_out);
+ if ($s->can_read(120)) {
+ sysread($log_pipe_out, $arv_put_output, 1024);
+ chomp($arv_put_output);
+ } else {
+ Log (undef, "timed out reading from 'arv-put'");
+ }
+
+ waitpid($log_pipe_pid, 0);
+ $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
+ if ($?) {
+ Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+ }
+
+ return $arv_put_output;
+}
+
+sub log_writer_is_active() {
+ return $log_pipe_pid;
+}
+
sub Log # ($jobstep_id, $logmessage)
{
if ($_[1] =~ /\n/) {
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
- if ($local_logfile || -t STDERR) {
+ if (log_writer_is_active() || -t STDERR) {
my @gmtime = gmtime;
$datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
$gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
}
print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
- if ($local_logfile) {
- print $local_logfile $datetime . " " . $message;
+ if (log_writer_is_active()) {
+ log_writer_send($datetime . " " . $message);
}
}
Log (undef, $message);
freeze() if @jobstep_todo;
collate_output() if @jobstep_todo;
- cleanup() if $Job;
- save_meta() if $local_logfile;
+ cleanup();
+ save_meta();
die;
}
sub cleanup
{
+ return unless $Job;
if ($Job->{'state'} eq 'Cancelled') {
$Job->update_attributes('finished_at' => scalar gmtime);
} else {
{
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
+ return unless log_writer_is_active();
- $local_logfile->flush;
- my $retry_count = put_retry_count();
- my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
- "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
- my $loglocator = `$cmd`;
- die "system $cmd exited ".exit_status_s($?) if $?;
- chomp($loglocator);
-
- $local_logfile = undef; # the temp file is automatically deleted
+ my $loglocator = log_writer_finish();
Log (undef, "log manifest is $loglocator");
$Job->{'log'} = $loglocator;
$Job->update_attributes('log', $loglocator);