Obtain job details from Arvados, run tasks on compute nodes (typically
invoked by scheduler on controller):
- crunch-job --job x-y-z
+ crunch-job --job x-y-z --git-dir /path/to/repo/.git
Obtain job details from command line, run tasks on local machine
(typically invoked by application or developer on VM):
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+ crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
+
+ crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
=head1 OPTIONS
=item --git-dir
-Path to .git directory where the specified commit is found.
+Path to a .git directory (or a git URL) where the commit given in the
+job's C<script_version> attribute is to be found. If this is I<not>
+given, the job's C<repository> attribute will be used.
=item --job-api-token
setup. This can speed up development and debugging when running jobs
locally.
+=item --job
+
+UUID of the job to run, or a JSON-encoded job resource without a
+UUID. If the latter is given, a new job object will be created.
+
=back
=head1 RUNNING JOBS LOCALLY
my $arv = Arvados->new('apiVersion' => 'v1');
-my $User = $arv->{'users'}->{'current'}->execute;
-
my $Job;
my $job_id;
my $dbh;
my $sth;
+my @jobstep;
+
+my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
+
if ($jobspec =~ /^[-a-z\d]+$/)
{
# $jobspec is an Arvados UUID, not a JSON job specification
- $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ $Job = retry_op(sub {
+ $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ });
if (!$force_unlock) {
# Claim this job, and make sure nobody else does
- eval {
+ eval { retry_op(sub {
# lock() sets is_locked_by_uuid and changes state to Running.
$arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
- };
+ }); };
if ($@) {
Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
exit EX_TEMPFAIL;
$Job->{'started_at'} = gmtime;
$Job->{'state'} = 'Running';
- $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
+ $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
}
$job_id = $Job->{'uuid'};
$ENV{"JOB_UUID"} = $job_id;
-my @jobstep;
my @jobstep_todo = ();
my @jobstep_done = ();
my @jobstep_tomerge = ();
}
else
{
- my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
- 'job_uuid' => $Job->{'uuid'},
- 'sequence' => 0,
- 'qsequence' => 0,
- 'parameters' => {},
- });
+ my $first_task = retry_op(sub {
+ $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
+ 'job_uuid' => $Job->{'uuid'},
+ 'sequence' => 0,
+ 'qsequence' => 0,
+ 'parameters' => {},
+ });
+ });
push @jobstep, { 'level' => 0,
'failures' => 0,
'arvados_task' => $first_task,
my $git_archive;
if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
- # If we're in user-land (i.e., not called from crunch-dispatch)
- # script_version can be an absolute directory path, signifying we
- # should work straight out of that directory instead of using a git
- # commit.
+ # If script_version looks like an absolute path, *and* the --git-dir
+ # argument was not given -- which implies we were not invoked by
+ # crunch-dispatch -- we will use the given path as a working
+ # directory instead of resolving script_version to a git commit (or
+ # doing anything else with git).
$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
$ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
}
else {
+ # Resolve the given script_version to a git commit sha1. Also, if
+ # the repository is remote, clone it into our local filesystem: this
+ # ensures "git archive" will work, and is necessary to reliably
+ # resolve a symbolic script_version like "master^".
$ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
- # Install requested code version
Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
} else {
# $repo is none of the above. It must be the name of a hosted
# repository.
- my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
- 'filters' => [['name','=',$repo]]
- )->{'items'};
- my $n_found = scalar @{$arv_repo_list};
+ my $arv_repo_list = retry_op(sub {
+ $arv->{'repositories'}->{'list'}->execute(
+ 'filters' => [['name','=',$repo]]);
+ });
+ my @repos_found = @{$arv_repo_list->{'items'}};
+ my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
if ($n_found > 0) {
Log(undef, "Repository '$repo' -> "
- . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
+ . join(", ", map { $_->{'uuid'} } @repos_found));
}
if ($n_found != 1) {
croak("Error: Found $n_found repositories with name '$repo'.");
}
- $repo = $arv_repo_list->[0]->{'fetch_url'};
+ $repo = $repos_found[0]->{'fetch_url'};
$repo_location = 'remote';
}
Log(undef, "Using $repo_location repository '$repo'");
# our local cache first, since that's cheaper. (We don't want to
# do that with tags/branches though -- those change over time, so
# they should always be resolved by the remote repo.)
- if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
+ if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
# Hide stderr because it's normal for this to fail:
my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
if ($? == 0 &&
- $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
+ # Careful not to resolve a branch named abcdeff to commit 1234567:
+ $sha1 =~ /^$treeish/ &&
$sha1 =~ /^([0-9a-f]{40})$/s) {
$commit = $1;
Log(undef, "Commit $commit already present in $local_repo");
while (my $manifest_line = <$orig_manifest>) {
$orig_manifest_text .= $manifest_line;
}
- my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
- 'manifest_text' => $orig_manifest_text,
+ my $output = retry_op(sub {
+ $arv->{'collections'}->{'create'}->execute(
+ 'collection' => {'manifest_text' => $orig_manifest_text});
});
Log(undef, "output uuid " . $output->{uuid});
Log(undef, "output hash " . $output->{portable_data_hash});
my $newtask_list = [];
my $newtask_results;
do {
- $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
+ $newtask_results = retry_op(sub {
+ $arv->{'job_tasks'}->{'list'}->execute(
+ 'where' => {
+ 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
+ });
push(@$newtask_list, @{$newtask_results->{items}});
} while (@{$newtask_results->{items}});
foreach my $arvados_task (@$newtask_list) {
my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
if (@stat && $stat[9] > $latest_refresh) {
$latest_refresh = scalar time;
- my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ my $Job2 = retry_op(sub {
+ $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ });
for my $attr ('cancelled_at',
'cancelled_by_user_uuid',
'cancelled_by_client_uuid',
my ($child_out, $child_in);
my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
- '--retries', put_retry_count());
+ '--retries', retry_count());
my $joboutput;
for (@jobstep)
{
Log (undef, $message);
freeze() if @jobstep_todo;
collate_output() if @jobstep_todo;
- cleanup() if $Job;
- save_meta() if log_writer_is_active();
+ cleanup();
+ save_meta();
die;
}
sub cleanup
{
+ return unless $Job;
if ($Job->{'state'} eq 'Cancelled') {
$Job->update_attributes('finished_at' => scalar gmtime);
} else {
{
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
+ return unless log_writer_is_active();
my $loglocator = log_writer_finish();
Log (undef, "log manifest is $loglocator");
# If not, return undef for both values.
my $locator = shift;
my ($streamname, $filename);
- if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
+ my $image = retry_op(sub {
+ $arv->{collections}->{get}->execute(uuid => $locator);
+ });
+ if ($image) {
foreach my $line (split(/\n/, $image->{manifest_text})) {
my @tokens = split(/\s+/, $line);
next if (!@tokens);
}
}
-sub put_retry_count {
- # Calculate a --retries argument for arv-put that will have it try
- # approximately as long as this Job has been running.
- my $stoptime = shift || time;
- my $starttime = $jobstep[0]->{starttime};
- my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
- my $retries = 0;
- while ($timediff >= 2) {
- $retries++;
- $timediff /= 2;
+sub retry_count {
+ # Calculate the number of times an operation should be retried,
+ # assuming exponential backoff, and that we're willing to retry as
+ # long as tasks have been running. Enforce a minimum of 3 retries.
+ my ($starttime, $endtime, $timediff, $retries);
+ if (@jobstep) {
+ $starttime = $jobstep[0]->{starttime};
+ $endtime = $jobstep[-1]->{finishtime};
+ }
+ if (!defined($starttime)) {
+ $timediff = 0;
+ } elsif (!defined($endtime)) {
+ $timediff = time - $starttime;
+ } else {
+ $timediff = ($endtime - $starttime) - (time - $endtime);
+ }
+ if ($timediff > 0) {
+ $retries = int(log($timediff) / log(2));
+ } else {
+ $retries = 1; # Use the minimum.
}
return ($retries > 3) ? $retries : 3;
}
+sub retry_op {
+ # Given a function reference, call it with the remaining arguments. If
+ # it dies, retry it with exponential backoff until it succeeds, or until
+ # the current retry_count is exhausted.
+ my $operation = shift;
+ my $retries = retry_count();
+ foreach my $try_count (0..$retries) {
+ my $next_try = time + (2 ** $try_count);
+ my $result = eval { $operation->(@_); };
+ if (!$@) {
+ return $result;
+ } elsif ($try_count < $retries) {
+ my $sleep_time = $next_try - time;
+ sleep($sleep_time) if ($sleep_time > 0);
+ }
+ }
+ # Ensure the error message ends in a newline, so Perl doesn't add
+ # retry_op's line number to it.
+ chomp($@);
+ die($@ . "\n");
+}
+
sub exit_status_s {
# Given a $?, return a human-readable exit code string like "0" or
# "1" or "0 with signal 1" or "1 with signal 11".