X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/313f5fedd4214d077e2b5c7c26bab4df3895c44a..ff7b22c70cd77073d9bdbebac0bf03d43745ed0c:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 617d22f4d1..0d35d53f9d 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -157,20 +157,15 @@ my $dbh; my $sth; my @jobstep; -my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; }); +my $User = api_call("users/current"); if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification - $Job = retry_op(sub { - $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - }); + $Job = api_call("jobs/get", uuid => $jobspec); if (!$force_unlock) { # Claim this job, and make sure nobody else does - eval { retry_op(sub { - # lock() sets is_locked_by_uuid and changes state to Running. - $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'}) - }); }; + eval { api_call("jobs/lock", uuid => $Job->{uuid}); }; if ($@) { Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; @@ -191,7 +186,7 @@ else $Job->{'started_at'} = gmtime; $Job->{'state'} = 'Running'; - $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); }); + $Job = api_call("jobs/create", job => $Job); } $job_id = $Job->{'uuid'}; @@ -321,13 +316,11 @@ if (defined $Job->{thawedfromkey}) } else { - my $first_task = retry_op(sub { - $arv->{'job_tasks'}->{'create'}->execute('job_task' => { - 'job_uuid' => $Job->{'uuid'}, - 'sequence' => 0, - 'qsequence' => 0, - 'parameters' => {}, - }); + my $first_task = api_call("job_tasks/create", job_task => { + 'job_uuid' => $Job->{'uuid'}, + 'sequence' => 0, + 'qsequence' => 0, + 'parameters' => {}, }); push @jobstep, { 'level' => 0, 'failures' => 0, @@ -427,10 +420,8 @@ else { } else { # $repo is none of the above. It must be the name of a hosted # repository. - my $arv_repo_list = retry_op(sub { - $arv->{'repositories'}->{'list'}->execute( - 'filters' => [['name','=',$repo]]); - }); + my $arv_repo_list = api_call("repositories/list", + 'filters' => [['name','=',$repo]]); my @repos_found = @{$arv_repo_list->{'items'}}; my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'}; if ($n_found > 0) { @@ -560,7 +551,9 @@ else { freeze_if_want_freeze ($installpid); select (undef, undef, undef, 0.1); } - Log (undef, "Install script exited ".exit_status_s($?)); + my $install_exited = $?; + Log (undef, "Install script exited ".exit_status_s($install_exited)); + exit (1) if $install_exited != 0; } if (!$have_slurm) @@ -708,17 +701,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); - my $build_script_to_send = ""; my $command = "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " ."&& cd $ENV{CRUNCH_TMP} "; - if ($build_script) - { - $build_script_to_send = $build_script; - $command .= - "&& perl -"; - } $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec "; if ($docker_hash) { @@ -747,18 +733,32 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E "; $ENV{TASK_KEEPMOUNT} = "/keep"; - # TASK_WORK is a plain docker data volume: it starts out empty, - # is writable, and persists until no containers use it any - # more. We don't use --volumes-from to share it with other - # containers: it is only accessible to this task, and it goes - # away when this task stops. - $command .= "--volume=\Q$ENV{TASK_WORK}\E "; - - # JOB_WORK is also a plain docker data volume for now. TODO: - # Share a single JOB_WORK volume across all task containers on a - # given worker node, and delete it when the job ends (and, in - # case that doesn't work, when the next job starts). - $command .= "--volume=\Q$ENV{JOB_WORK}\E "; + # TASK_WORK is almost exactly like a docker data volume: it + # starts out empty, is writable, and persists until no + # containers use it any more. We don't use --volumes-from to + # share it with other containers: it is only accessible to this + # task, and it goes away when this task stops. + # + # However, a docker data volume is writable only by root unless + # the mount point already happens to exist in the container with + # different permissions. Therefore, we [1] assume /tmp already + # exists in the image and is writable by the crunch user; [2] + # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be + # writable if they are created by docker while setting up the + # other --volumes); and [3] create $TASK_WORK inside the + # container using $build_script. + $command .= "--volume=/tmp "; + $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname"; + $ENV{"HOME"} = $ENV{"TASK_WORK"}; + $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated + + # TODO: Share a single JOB_WORK volume across all task + # containers on a given worker node, and delete it when the job + # ends (and, in case that doesn't work, when the next job + # starts). + # + # For now, use the same approach as TASK_WORK above. + $ENV{"JOB_WORK"} = "/tmp/crunch-job-work"; while (my ($env_key, $env_val) = each %ENV) { @@ -769,16 +769,16 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $command .= "--env=\QHOME=$ENV{HOME}\E "; $command .= "\Q$docker_hash\E "; $command .= "stdbuf --output=0 --error=0 "; - $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; + $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; $command .= "stdbuf --output=0 --error=0 "; - $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; + $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } my @execargs = ('bash', '-c', $command); - srun (\@srunargs, \@execargs, undef, $build_script_to_send); + srun (\@srunargs, \@execargs, undef, $build_script); # exec() failed, we assume nothing happened. die "srun() failed on build script\n"; } @@ -935,10 +935,8 @@ else { while (my $manifest_line = <$orig_manifest>) { $orig_manifest_text .= $manifest_line; } - my $output = retry_op(sub { - $arv->{'collections'}->{'create'}->execute( - 'collection' => {'manifest_text' => $orig_manifest_text}); - }); + my $output = api_call("collections/create", collection => { + 'manifest_text' => $orig_manifest_text}); Log(undef, "output uuid " . $output->{uuid}); Log(undef, "output hash " . $output->{portable_data_hash}); $Job->update_attributes('output' => $output->{portable_data_hash}); @@ -1072,15 +1070,14 @@ sub reapchildren my $newtask_list = []; my $newtask_results; do { - $newtask_results = retry_op(sub { - $arv->{'job_tasks'}->{'list'}->execute( - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence', - 'offset' => scalar(@$newtask_list), - ); - }); + $newtask_results = api_call( + "job_tasks/list", + 'where' => { + 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} + }, + 'order' => 'qsequence', + 'offset' => scalar(@$newtask_list), + ); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); foreach my $arvados_task (@$newtask_list) { @@ -1103,9 +1100,7 @@ sub check_refresh_wanted my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; - my $Job2 = retry_op(sub { - $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - }); + my $Job2 = api_call("jobs/get", uuid => $jobspec); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', 'cancelled_by_client_uuid', @@ -1620,9 +1615,7 @@ sub find_docker_image { # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); - my $image = retry_op(sub { - $arv->{collections}->{get}->execute(uuid => $locator); - }); + my $image = api_call("collections/get", uuid => $locator); if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); @@ -1669,10 +1662,14 @@ sub retry_count { } sub retry_op { - # Given a function reference, call it with the remaining arguments. If - # it dies, retry it with exponential backoff until it succeeds, or until - # the current retry_count is exhausted. + # Pass in two function references. + # This method will be called with the remaining arguments. + # If it dies, retry it with exponential backoff until it succeeds, + # or until the current retry_count is exhausted. After each failure + # that can be retried, the second function will be called with + # the current try count (0-based), next try time, and error message. my $operation = shift; + my $retry_callback = shift; my $retries = retry_count(); foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); @@ -1680,6 +1677,7 @@ sub retry_op { if (!$@) { return $result; } elsif ($try_count < $retries) { + $retry_callback->($try_count, $next_try, $@); my $sleep_time = $next_try - time; sleep($sleep_time) if ($sleep_time > 0); } @@ -1690,6 +1688,32 @@ sub retry_op { die($@ . "\n"); } +sub api_call { + # Pass in a /-separated API method name, and arguments for it. + # This function will call that method, retrying as needed until + # the current retry_count is exhausted, with a log on the first failure. + my $method_name = shift; + my $log_api_retry = sub { + my ($try_count, $next_try_at, $errmsg) = @_; + $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; + $errmsg =~ s/\s/ /g; + $errmsg =~ s/\s+$//; + my $retry_msg; + if ($next_try_at < time) { + $retry_msg = "Retrying."; + } else { + my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at); + $retry_msg = "Retrying at $next_try_fmt."; + } + Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); + }; + my $method = $arv; + foreach my $key (split(/\//, $method_name)) { + $method = $method->{$key}; + } + return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); +} + sub exit_status_s { # Given a $?, return a human-readable exit code string like "0" or # "1" or "0 with signal 1" or "1 with signal 11". @@ -1715,9 +1739,10 @@ use File::Path qw( make_path remove_tree ); my $destdir = $ENV{"CRUNCH_SRC"}; my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; my $repo = $ENV{"CRUNCH_SRC_URL"}; +my $job_work = $ENV{"JOB_WORK"}; my $task_work = $ENV{"TASK_WORK"}; -for my $dir ($destdir, $task_work) { +for my $dir ($destdir, $job_work, $task_work) { if ($dir) { make_path $dir; -e $dir or die "Failed to create temporary directory ($dir): $!"; @@ -1728,16 +1753,17 @@ if ($task_work) { remove_tree($task_work, {keep_root => 1}); } +my @git_archive_data = ; +if (!@git_archive_data) { + # Nothing to extract -> nothing to install. + run_argv_and_exit(); +} open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; if (readlink ("$destdir.commit") eq $commit && -d $destdir) { - if (@ARGV) { - exec(@ARGV); - die "Cannot exec `@ARGV`: $!"; - } else { - exit 0; - } + # This version already installed -> nothing to do. + run_argv_and_exit(); } unlink "$destdir.commit"; @@ -1746,13 +1772,10 @@ open STDOUT, ">", "$destdir.log"; open STDERR, ">&STDOUT"; mkdir $destdir; -my @git_archive_data = ; -if (@git_archive_data) { - open TARX, "|-", "tar", "-C", $destdir, "-xf", "-"; - print TARX @git_archive_data; - if(!close(TARX)) { - die "'tar -C $destdir -xf -' exited $?: $!"; - } +open TARX, "|-", "tar", "-C", $destdir, "-xf", "-"; +print TARX @git_archive_data; +if(!close(TARX)) { + die "'tar -C $destdir -xf -' exited $?: $!"; } my $pwd; @@ -1784,11 +1807,16 @@ if ($commit) { close L; -if (@ARGV) { +run_argv_and_exit(); + +sub run_argv_and_exit +{ + if (@ARGV) { exec(@ARGV); die "Cannot exec `@ARGV`: $!"; -} else { + } else { exit 0; + } } sub shell_or_die