X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8cbabbbe014628574a10a48148d179c14137d61f..febdebbb58592be73dcf7d4bd4b2c7ff96657741:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 5a92176e7f..b8afe638ac 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -1,10 +1,9 @@ #!/usr/bin/env perl +# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: AGPL-3.0 -# -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- - =head1 NAME crunch-job: Execute job steps, save snapshots as requested, collate output. @@ -133,6 +132,7 @@ my $resume_stash; my $cgroup_root = "/sys/fs/cgroup"; my $docker_bin = "docker.io"; my $docker_run_args = ""; +my $srun_sync_timeout = 15*60; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, @@ -142,6 +142,7 @@ GetOptions('force-unlock' => \$force_unlock, 'cgroup-root=s' => \$cgroup_root, 'docker-bin=s' => \$docker_bin, 'docker-run-args=s' => \$docker_run_args, + 'srun-sync-timeout=i' => \$srun_sync_timeout, ); if (defined $job_api_token) { @@ -837,8 +838,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) close($_); } fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec - open(STDOUT,">&writer"); - open(STDERR,">&writer"); + open(STDOUT,">&writer") or croak ($!); + open(STDERR,">&writer") or croak ($!); undef $dbh; undef $sth; @@ -1022,7 +1023,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) delete $Jobstep->{tempfail}; $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime}); - $Jobstep->{'arvados_task'}->save; + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); splice @jobstep_todo, $todo_ptr, 1; --$todo_ptr; @@ -1188,6 +1189,8 @@ sub reapchildren . $slot[$proc{$pid}->{slot}]->{cpu}); my $jobstepidx = $proc{$pid}->{jobstepidx}; + readfrompipes_after_exit ($jobstepidx); + $children_reaped++; my $elapsed = time - $proc{$pid}->{time}; my $Jobstep = $jobstep[$jobstepidx]; @@ -1205,7 +1208,7 @@ sub reapchildren "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", exit_status_s($childstatus))); $Jobstep->{'arvados_task'}->{success} = 0; - $Jobstep->{'arvados_task'}->save; + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); $task_success = 0; } @@ -1258,8 +1261,7 @@ sub reapchildren $Jobstep->{exitcode} = $childstatus; $Jobstep->{finishtime} = time; $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); - $Jobstep->{'arvados_task'}->save; - process_stderr_final ($jobstepidx); + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); Log ($jobstepidx, sprintf("task output (%d bytes): %s", length($Jobstep->{'arvados_task'}->{output}), $Jobstep->{'arvados_task'}->{output})); @@ -1544,7 +1546,7 @@ sub preprocess_stderr $st->{node}->{fail_count}++; } } - elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) { + elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) { $jobstep[$jobstepidx]->{tempfail} = 1; if (defined($job_slot_index)) { $slot[$job_slot_index]->{node}->{fail_count}++; @@ -1562,9 +1564,27 @@ sub preprocess_stderr } -sub process_stderr_final +# Read whatever is still available on its stderr+stdout pipes after +# the given child process has exited. +sub readfrompipes_after_exit { my $jobstepidx = shift; + + # The fact that the child has exited allows some convenient + # simplifications: (1) all data must have already been written, so + # there's no need to wait for more once sysread returns 0; (2) the + # total amount of data available is bounded by the pipe buffer size, + # so it's safe to read everything into one string. + my $buf; + while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) { + $jobstep[$jobstepidx]->{stderr_at} = time; + $jobstep[$jobstepidx]->{stderr} .= $buf; + } + if ($jobstep[$jobstepidx]->{stdout_r}) { + while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) { + $jobstep[$jobstepidx]->{stdout_captured} .= $buf; + } + } preprocess_stderr ($jobstepidx); map { @@ -1989,6 +2009,8 @@ sub srun_sync my ($stdout_r, $stdout_w); pipe $stdout_r, $stdout_w or croak("pipe() failed: $!"); + my $started_srun = scalar time; + my $srunpid = fork(); if ($srunpid == 0) { @@ -1996,8 +2018,8 @@ sub srun_sync close($stdout_r); fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec fcntl($stdout_w, F_SETFL, 0) or croak($!); - open(STDERR, ">&", $stderr_w); - open(STDOUT, ">&", $stdout_w); + open(STDERR, ">&", $stderr_w) or croak ($!); + open(STDOUT, ">&", $stdout_w) or croak ($!); srun ($srunargs, $execargs, $opts, $stdin); exit (1); } @@ -2032,12 +2054,17 @@ sub srun_sync if (!$busy) { select(undef, undef, undef, 0.1); } + if (($started_srun + $srun_sync_timeout) < scalar time) { + # Exceeded general timeout for "srun_sync" operations, likely + # means something got stuck on the remote node. + Log(undef, "srun_sync exceeded timeout, will fail."); + $main::please_freeze = 1; + } killem(keys %proc) if $main::please_freeze; } my $exited = $?; - 1 while readfrompipes(); - process_stderr_final ($jobstepidx); + readfrompipes_after_exit ($jobstepidx); Log (undef, "$label: exit ".exit_status_s($exited)); @@ -2177,8 +2204,22 @@ sub retry_op { # that can be retried, the second function will be called with # the current try count (0-based), next try time, and error message. my $operation = shift; - my $retry_callback = shift; + my $op_text = shift; my $retries = retry_count(); + my $retry_callback = sub { + my ($try_count, $next_try_at, $errmsg) = @_; + $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; + $errmsg =~ s/\s/ /g; + $errmsg =~ s/\s+$//; + my $retry_msg; + if ($next_try_at < time) { + $retry_msg = "Retrying."; + } else { + my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); + $retry_msg = "Retrying at $next_try_fmt."; + } + Log(undef, "$op_text failed: $errmsg. $retry_msg"); + }; foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); my $result = eval { $operation->(@_); }; @@ -2201,25 +2242,11 @@ sub api_call { # This function will call that method, retrying as needed until # the current retry_count is exhausted, with a log on the first failure. my $method_name = shift; - my $log_api_retry = sub { - my ($try_count, $next_try_at, $errmsg) = @_; - $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; - $errmsg =~ s/\s/ /g; - $errmsg =~ s/\s+$//; - my $retry_msg; - if ($next_try_at < time) { - $retry_msg = "Retrying."; - } else { - my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); - $retry_msg = "Retrying at $next_try_fmt."; - } - Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); - }; my $method = $arv; foreach my $key (split(/\//, $method_name)) { $method = $method->{$key}; } - return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); + return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_); } sub exit_status_s { @@ -2493,8 +2520,8 @@ if ((-d $python_dir) and can_run("python2.7")) { # Hide messages from the install script (unless it fails: shell_or_die # will show $destdir.log in that case). -open(STDOUT, ">>", "$destdir.log"); -open(STDERR, ">&", STDOUT); +open(STDOUT, ">>", "$destdir.log") or die ($!); +open(STDERR, ">&", STDOUT) or die ($!); if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir); @@ -2515,7 +2542,7 @@ close L; sub can_run { my $command_name = shift; - open(my $which, "-|", "which", $command_name); + open(my $which, "-|", "which", $command_name) or die ($!); while (<$which>) { } close($which); return ($? == 0);