From: Brett Smith Date: Tue, 7 Oct 2014 13:35:11 +0000 (-0400) Subject: 4012: crunch-job retries all API operations. X-Git-Tag: 1.1.0~2123^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/f0b10a9e56225799f9821b07d1497ce53c2608bf 4012: crunch-job retries all API operations. This will make jobs more robust against transient errors when talking to the API server. --- diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 00d8389d96..2cfd2dc08a 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -141,22 +141,26 @@ $SIG{'USR2'} = sub my $arv = Arvados->new('apiVersion' => 'v1'); -my $User = $arv->{'users'}->{'current'}->execute; - my $Job; my $job_id; my $dbh; my $sth; +my @jobstep; + +my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; }); + if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification - $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + $Job = retry_op(sub { + $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + }); if (!$force_unlock) { # Claim this job, and make sure nobody else does - eval { + eval { retry_op(sub { # lock() sets is_locked_by_uuid and changes state to Running. $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'}) - }; + }); }; if ($@) { Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; @@ -177,7 +181,7 @@ else $Job->{'started_at'} = gmtime; $Job->{'state'} = 'Running'; - $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job); + $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); }); } $job_id = $Job->{'uuid'}; @@ -290,7 +294,6 @@ $ENV{"CRUNCH_JOB_UUID"} = $job_id; $ENV{"JOB_UUID"} = $job_id; -my @jobstep; my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); @@ -308,12 +311,14 @@ if (defined $Job->{thawedfromkey}) } else { - my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => { - 'job_uuid' => $Job->{'uuid'}, - 'sequence' => 0, - 'qsequence' => 0, - 'parameters' => {}, - }); + my $first_task = retry_op(sub { + $arv->{'job_tasks'}->{'create'}->execute('job_task' => { + 'job_uuid' => $Job->{'uuid'}, + 'sequence' => 0, + 'qsequence' => 0, + 'parameters' => {}, + }); + }); push @jobstep, { 'level' => 0, 'failures' => 0, 'arvados_task' => $first_task, @@ -408,9 +413,10 @@ else { } else { # $repo is none of the above. It must be the name of a hosted # repository. - my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute( - 'filters' => [['name','=',$repo]] - )->{'items'}; + my $arv_repo_list = retry_op(sub { + $arv->{'repositories'}->{'list'}->execute( + 'filters' => [['name','=',$repo]])->{'items'}; + }); my $n_found = scalar @{$arv_repo_list}; if ($n_found > 0) { Log(undef, "Repository '$repo' -> " @@ -898,8 +904,9 @@ else { while (my $manifest_line = <$orig_manifest>) { $orig_manifest_text .= $manifest_line; } - my $output = $arv->{'collections'}->{'create'}->execute('collection' => { - 'manifest_text' => $orig_manifest_text, + my $output = retry_op(sub { + $arv->{'collections'}->{'create'}->execute( + 'collection' => {'manifest_text' => $orig_manifest_text}); }); Log(undef, "output uuid " . $output->{uuid}); Log(undef, "output hash " . $output->{portable_data_hash}); @@ -1034,13 +1041,15 @@ sub reapchildren my $newtask_list = []; my $newtask_results; do { - $newtask_results = $arv->{'job_tasks'}->{'list'}->execute( - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence', - 'offset' => scalar(@$newtask_list), - ); + $newtask_results = retry_op(sub { + $arv->{'job_tasks'}->{'list'}->execute( + 'where' => { + 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} + }, + 'order' => 'qsequence', + 'offset' => scalar(@$newtask_list), + ); + }); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); foreach my $arvados_task (@$newtask_list) { @@ -1063,7 +1072,9 @@ sub check_refresh_wanted my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; - my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + my $Job2 = retry_op(sub { + $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + }); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', 'cancelled_by_client_uuid', @@ -1244,7 +1255,7 @@ sub collate_output my ($child_out, $child_in); my $pid = open2($child_out, $child_in, 'arv-put', '--raw', - '--retries', put_retry_count()); + '--retries', retry_count()); my $joboutput; for (@jobstep) { @@ -1574,7 +1585,10 @@ sub find_docker_image { # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); - if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) { + my $image = retry_op(sub { + $arv->{collections}->{get}->execute(uuid => $locator); + }); + if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); next if (!@tokens); @@ -1595,20 +1609,52 @@ sub find_docker_image { } } -sub put_retry_count { - # Calculate a --retries argument for arv-put that will have it try - # approximately as long as this Job has been running. - my $stoptime = shift || time; - my $starttime = $jobstep[0]->{starttime}; - my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1; - my $retries = 0; - while ($timediff >= 2) { - $retries++; - $timediff /= 2; +sub retry_count { + # Calculate the number of times an operation should be retried, + # assuming exponential backoff, and that we're willing to retry as + # long as tasks have been running. Enforce a minimum of 3 retries. + my ($starttime, $endtime, $timediff, $retries); + if (@jobstep) { + $starttime = $jobstep[0]->{starttime}; + $endtime = $jobstep[-1]->{finishtime}; + } + if (!defined($starttime)) { + $timediff = 0; + } elsif (!defined($endtime)) { + $timediff = time - $starttime; + } else { + $timediff = ($endtime - $starttime) - (time - $endtime); + } + if ($timediff > 0) { + $retries = int(log($timediff) / log(2)); + } else { + $retries = 1; # Use the minimum. } return ($retries > 3) ? $retries : 3; } +sub retry_op { + # Given a function reference, call it with the remaining arguments. If + # it dies, retry it with exponential backoff until it succeeds, or until + # the current retry_count is exhausted. + my $operation = shift; + my $retries = retry_count(); + foreach my $try_count (0..$retries) { + my $next_try = time + (2 ** $try_count); + my $result = eval { $operation->(@_); }; + if (!$@) { + return $result; + } elsif ($try_count < $retries) { + my $sleep_time = $next_try - time; + sleep($sleep_time) if ($sleep_time > 0); + } + } + # Ensure the error message ends in a newline, so Perl doesn't add + # retry_op's line number to it. + chomp($@); + die($@ . "\n"); +} + sub exit_status_s { # Given a $?, return a human-readable exit code string like "0" or # "1" or "0 with signal 1" or "1 with signal 11".