X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/55aafbb07904ca24390dd47ea960eae7cb2b909a..a524fabd054d2b923eceb575de95674caca58e85:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index c0e3075c9b..f2e9fc2878 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -1,7 +1,7 @@ #!/usr/bin/env perl # Copyright (C) The Arvados Authors. All rights reserved. # -# SPDX-License-Identifier: Apache-2.0 +# SPDX-License-Identifier: AGPL-3.0 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- @@ -189,7 +189,7 @@ if (($Job || $local_job)->{docker_image_locator}) { $cmd = [$docker_bin, 'ps', '-q']; } Log(undef, "Sanity check is `@$cmd`"); -my ($exited, $stdout, $stderr) = srun_sync( +my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], $cmd, {label => "sanity check"}); @@ -397,7 +397,7 @@ if (!defined $no_clear_tmp) { # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean # up work directories crunch_tmp/work, crunch_tmp/opt, # crunch_tmp/src*. - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], ['bash', '-ec', q{ arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP} @@ -405,7 +405,7 @@ rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRU }], {label => "clean work dirs"}); if ($exited != 0) { - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } } @@ -439,20 +439,23 @@ fi echo >&2 "image loaded successfully" }; - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodelist=" . join(',', @node)], ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script], {label => "load docker image"}); if ($exited != 0) { - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } # Determine whether this version of Docker supports memory+swap limits. - ($exited, $stdout, $stderr) = srun_sync( + ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=1"], [$docker_bin, 'run', '--help'], {label => "check --memory-swap feature"}); + if ($tempfail) { + exit_retry_unlocked(); + } $docker_limitmem = ($stdout =~ /--memory-swap/); # Find a non-root Docker user to use. @@ -472,7 +475,7 @@ echo >&2 "image loaded successfully" $label = "check whether user '$try_user' is UID 0"; $try_user_arg = "--user=$try_user"; } - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=1"], ["/bin/sh", "-ec", "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"], @@ -486,6 +489,8 @@ echo >&2 "image loaded successfully" Log(undef, "Container will run with $dockeruserarg"); } last; + } elsif ($tempfail) { + exit_retry_unlocked(); } } @@ -678,11 +683,14 @@ else { "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); - my ($stdout, $stderr); - ($exited, $stdout, $stderr) = srun_sync( + my ($stdout, $stderr, $tempfail); + ($exited, $stdout, $stderr, $tempfail) = srun_sync( \@srunargs, \@execargs, {label => "run install script on all workers"}, - $build_script . $git_archive); + $build_script . $git_archive); + if ($tempfail) { + exit_retry_unlocked(); + } my $stderr_anything_from_script = 0; for my $line (split(/\n/, $stderr)) { @@ -829,8 +837,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) close($_); } fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec - open(STDOUT,">&writer"); - open(STDERR,">&writer"); + open(STDOUT,">&writer") or croak ($!); + open(STDERR,">&writer") or croak ($!); undef $dbh; undef $sth; @@ -1014,7 +1022,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) delete $Jobstep->{tempfail}; $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime}); - $Jobstep->{'arvados_task'}->save; + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); splice @jobstep_todo, $todo_ptr, 1; --$todo_ptr; @@ -1117,7 +1125,7 @@ if (!defined $main::success) } elsif ($working_slot_count < 1) { save_output_collection(); save_meta(); - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } elsif ($thisround_succeeded == 0 && ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; @@ -1197,7 +1205,7 @@ sub reapchildren "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", exit_status_s($childstatus))); $Jobstep->{'arvados_task'}->{success} = 0; - $Jobstep->{'arvados_task'}->save; + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); $task_success = 0; } @@ -1250,7 +1258,7 @@ sub reapchildren $Jobstep->{exitcode} = $childstatus; $Jobstep->{finishtime} = time; $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); - $Jobstep->{'arvados_task'}->save; + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); process_stderr_final ($jobstepidx); Log ($jobstepidx, sprintf("task output (%d bytes): %s", length($Jobstep->{'arvados_task'}->{output}), @@ -1536,7 +1544,7 @@ sub preprocess_stderr $st->{node}->{fail_count}++; } } - elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) { + elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) { $jobstep[$jobstepidx]->{tempfail} = 1; if (defined($job_slot_index)) { $slot[$job_slot_index]->{node}->{fail_count}++; @@ -1988,8 +1996,8 @@ sub srun_sync close($stdout_r); fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec fcntl($stdout_w, F_SETFL, 0) or croak($!); - open(STDERR, ">&", $stderr_w); - open(STDOUT, ">&", $stdout_w); + open(STDERR, ">&", $stderr_w) or croak ($!); + open(STDOUT, ">&", $stdout_w) or croak ($!); srun ($srunargs, $execargs, $opts, $stdin); exit (1); } @@ -2044,7 +2052,7 @@ sub srun_sync if ($main::please_freeze || $j->{tempfail}) { $exited ||= 255; } - return ($exited, $j->{stdout_captured}, $j->{stderr_captured}); + return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail}); } @@ -2132,6 +2140,11 @@ sub find_docker_image { } } +sub exit_retry_unlocked { + Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED); + exit(EX_RETRY_UNLOCKED); +} + sub retry_count { # Calculate the number of times an operation should be retried, # assuming exponential backoff, and that we're willing to retry as @@ -2164,8 +2177,22 @@ sub retry_op { # that can be retried, the second function will be called with # the current try count (0-based), next try time, and error message. my $operation = shift; - my $retry_callback = shift; + my $op_text = shift; my $retries = retry_count(); + my $retry_callback = sub { + my ($try_count, $next_try_at, $errmsg) = @_; + $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; + $errmsg =~ s/\s/ /g; + $errmsg =~ s/\s+$//; + my $retry_msg; + if ($next_try_at < time) { + $retry_msg = "Retrying."; + } else { + my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); + $retry_msg = "Retrying at $next_try_fmt."; + } + Log(undef, "$op_text failed: $errmsg. $retry_msg"); + }; foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); my $result = eval { $operation->(@_); }; @@ -2188,25 +2215,11 @@ sub api_call { # This function will call that method, retrying as needed until # the current retry_count is exhausted, with a log on the first failure. my $method_name = shift; - my $log_api_retry = sub { - my ($try_count, $next_try_at, $errmsg) = @_; - $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; - $errmsg =~ s/\s/ /g; - $errmsg =~ s/\s+$//; - my $retry_msg; - if ($next_try_at < time) { - $retry_msg = "Retrying."; - } else { - my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); - $retry_msg = "Retrying at $next_try_fmt."; - } - Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); - }; my $method = $arv; foreach my $key (split(/\//, $method_name)) { $method = $method->{$key}; } - return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); + return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_); } sub exit_status_s { @@ -2480,8 +2493,8 @@ if ((-d $python_dir) and can_run("python2.7")) { # Hide messages from the install script (unless it fails: shell_or_die # will show $destdir.log in that case). -open(STDOUT, ">>", "$destdir.log"); -open(STDERR, ">&", STDOUT); +open(STDOUT, ">>", "$destdir.log") or die ($!); +open(STDERR, ">&", STDOUT) or die ($!); if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir); @@ -2502,7 +2515,7 @@ close L; sub can_run { my $command_name = shift; - open(my $which, "-|", "which", $command_name); + open(my $which, "-|", "which", $command_name) or die ($!); while (<$which>) { } close($which); return ($? == 0);