X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/55b017847fe87e00e7fb9c6bfd0444b83f8ca12c..febdebbb58592be73dcf7d4bd4b2c7ff96657741:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index b4cb21405f..b8afe638ac 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -1,5 +1,8 @@ #!/usr/bin/env perl # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 =head1 NAME @@ -129,6 +132,7 @@ my $resume_stash; my $cgroup_root = "/sys/fs/cgroup"; my $docker_bin = "docker.io"; my $docker_run_args = ""; +my $srun_sync_timeout = 15*60; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, @@ -138,6 +142,7 @@ GetOptions('force-unlock' => \$force_unlock, 'cgroup-root=s' => \$cgroup_root, 'docker-bin=s' => \$docker_bin, 'docker-run-args=s' => \$docker_run_args, + 'srun-sync-timeout=i' => \$srun_sync_timeout, ); if (defined $job_api_token) { @@ -185,7 +190,7 @@ if (($Job || $local_job)->{docker_image_locator}) { $cmd = [$docker_bin, 'ps', '-q']; } Log(undef, "Sanity check is `@$cmd`"); -my ($exited, $stdout, $stderr) = srun_sync( +my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], $cmd, {label => "sanity check"}); @@ -355,6 +360,7 @@ my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; my $squeue_checked = 0; +my $sinfo_checked = 0; my $latest_refresh = scalar time; @@ -392,15 +398,15 @@ if (!defined $no_clear_tmp) { # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean # up work directories crunch_tmp/work, crunch_tmp/opt, # crunch_tmp/src*. - # - # TODO: When #5036 is done and widely deployed, we can limit mount's - # -t option to simply fuse.keep. - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'], + ['bash', '-ec', q{ +arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP} +rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid + }], {label => "clean work dirs"}); if ($exited != 0) { - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } } @@ -416,34 +422,41 @@ if ($docker_locator = $Job->{docker_image_locator}) { Log (undef, "docker image hash is $docker_hash"); $docker_stream =~ s/^\.//; my $docker_install_script = qq{ -if $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then - exit 0 +loaded() { + id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1 + echo "image ID is \$id" + [[ \${id} = \Q$docker_hash\E ]] +} +if loaded >&2 2>/dev/null; then + echo >&2 "image is already present" + exit 0 fi -declare -a exit_codes=("\${PIPESTATUS[@]}") -if [ 0 != "\${exit_codes[0]}" ]; then - exit "\${exit_codes[0]}" # `docker images` failed -elif [ 1 != "\${exit_codes[1]}" ]; then - exit "\${exit_codes[1]}" # `grep` encountered an error -else - # Everything worked fine, but grep didn't find the image on this host. - arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +echo >&2 "docker image is not present; loading" +arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +if ! loaded >&2; then + echo >&2 "`docker load` exited 0, but image is not found (!)" + exit 1 fi +echo >&2 "image loaded successfully" }; - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodelist=" . join(',', @node)], ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script], {label => "load docker image"}); if ($exited != 0) { - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } # Determine whether this version of Docker supports memory+swap limits. - ($exited, $stdout, $stderr) = srun_sync( + ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=1"], [$docker_bin, 'run', '--help'], {label => "check --memory-swap feature"}); + if ($tempfail) { + exit_retry_unlocked(); + } $docker_limitmem = ($stdout =~ /--memory-swap/); # Find a non-root Docker user to use. @@ -463,7 +476,7 @@ fi $label = "check whether user '$try_user' is UID 0"; $try_user_arg = "--user=$try_user"; } - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=1"], ["/bin/sh", "-ec", "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"], @@ -477,6 +490,8 @@ fi Log(undef, "Container will run with $dockeruserarg"); } last; + } elsif ($tempfail) { + exit_retry_unlocked(); } } @@ -669,11 +684,14 @@ else { "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); - my ($stdout, $stderr); - ($exited, $stdout, $stderr) = srun_sync( + my ($stdout, $stderr, $tempfail); + ($exited, $stdout, $stderr, $tempfail) = srun_sync( \@srunargs, \@execargs, {label => "run install script on all workers"}, - $build_script . $git_archive); + $build_script . $git_archive); + if ($tempfail) { + exit_retry_unlocked(); + } my $stderr_anything_from_script = 0; for my $line (split(/\n/, $stderr)) { @@ -714,6 +732,15 @@ foreach (split (/\n/, $Job->{knobs})) { Log (undef, "knob " . $_); } +my $resp = api_call( + 'nodes/list', + 'filters' => [['hostname', 'in', \@node]], + 'order' => 'hostname', + 'limit' => scalar(@node), + ); +for my $n (@{$resp->{items}}) { + Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties})); +} @@ -811,8 +838,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) close($_); } fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec - open(STDOUT,">&writer"); - open(STDERR,">&writer"); + open(STDOUT,">&writer") or croak ($!); + open(STDERR,">&writer") or croak ($!); undef $dbh; undef $sth; @@ -862,12 +889,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' /dev/null ; then VOLUME_CRUNCHRUNNER=--volume=$(which crunchrunner):/usr/local/bin/crunchrunner ; fi } - .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUME_CERTS=--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt ; } - .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUME_CERTS=--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt ; fi }; + .q{&& declare -a VOLUMES=() } + .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi } + .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; } + .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi }; $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec "; $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh"; @@ -934,7 +959,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) # Bind mount the crunchrunner binary and host TLS certificates file into # the container. - $command .= "\$VOLUME_CRUNCHRUNNER \$VOLUME_CERTS "; + $command .= '"${VOLUMES[@]}" '; while (my ($env_key, $env_val) = each %ENV) { @@ -998,7 +1023,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) delete $Jobstep->{tempfail}; $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime}); - $Jobstep->{'arvados_task'}->save; + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); splice @jobstep_todo, $todo_ptr, 1; --$todo_ptr; @@ -1101,7 +1126,7 @@ if (!defined $main::success) } elsif ($working_slot_count < 1) { save_output_collection(); save_meta(); - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } elsif ($thisround_succeeded == 0 && ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; @@ -1118,10 +1143,10 @@ freeze(); my $collated_output = save_output_collection(); Log (undef, "finish"); -save_meta(); +my $final_log = save_meta(); my $final_state; -if ($collated_output && $main::success) { +if ($collated_output && $final_log && $main::success) { $final_state = 'Complete'; } else { $final_state = 'Failed'; @@ -1164,6 +1189,8 @@ sub reapchildren . $slot[$proc{$pid}->{slot}]->{cpu}); my $jobstepidx = $proc{$pid}->{jobstepidx}; + readfrompipes_after_exit ($jobstepidx); + $children_reaped++; my $elapsed = time - $proc{$pid}->{time}; my $Jobstep = $jobstep[$jobstepidx]; @@ -1181,7 +1208,7 @@ sub reapchildren "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", exit_status_s($childstatus))); $Jobstep->{'arvados_task'}->{success} = 0; - $Jobstep->{'arvados_task'}->save; + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); $task_success = 0; } @@ -1234,8 +1261,7 @@ sub reapchildren $Jobstep->{exitcode} = $childstatus; $Jobstep->{finishtime} = time; $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); - $Jobstep->{'arvados_task'}->save; - process_stderr_final ($jobstepidx); + retry_op(sub { $Jobstep->{'arvados_task'}->save; }, "job_tasks.update API"); Log ($jobstepidx, sprintf("task output (%d bytes): %s", length($Jobstep->{'arvados_task'}->{output}), $Jobstep->{'arvados_task'}->{output})); @@ -1403,6 +1429,37 @@ sub check_squeue } } +sub check_sinfo +{ + # If a node fails in a multi-node "srun" call during job setup, the call + # may hang instead of exiting with a nonzero code. This function checks + # "sinfo" for the health of the nodes that were allocated and ensures that + # they are all still in the "alloc" state. If a node that is allocated to + # this job is not in "alloc" state, then set please_freeze. + # + # This is only called from srun_sync() for node configuration. If a + # node fails doing actual work, there are other recovery mechanisms. + + # Do not call `sinfo` more than once every 15 seconds. + return if $sinfo_checked > time - 15; + $sinfo_checked = time; + + # The output format "%t" means output node states. + my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`; + if ($? != 0) + { + Log(undef, "warning: sinfo exit status $? ($!)"); + return; + } + chop @sinfo; + + foreach (@sinfo) + { + if ($_ != "alloc" && $_ != "alloc*") { + $main::please_freeze = 1; + } + } +} sub release_allocation { @@ -1479,18 +1536,24 @@ sub preprocess_stderr my $line = $1; substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), ""; Log ($jobstepidx, "stderr $line"); - if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) { - # whoa. + if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) { + # If the allocation is revoked, we can't possibly continue, so mark all + # nodes as failed. This will cause the overall exit code to be + # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run + # this job. $main::please_freeze = 1; + foreach my $st (@slot) { + $st->{node}->{fail_count}++; + } } - elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) { + elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) { $jobstep[$jobstepidx]->{tempfail} = 1; if (defined($job_slot_index)) { $slot[$job_slot_index]->{node}->{fail_count}++; ban_node_by_slot($job_slot_index); } } - elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) { + elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) { $jobstep[$jobstepidx]->{tempfail} = 1; ban_node_by_slot($job_slot_index) if (defined($job_slot_index)); } @@ -1501,9 +1564,27 @@ sub preprocess_stderr } -sub process_stderr_final +# Read whatever is still available on its stderr+stdout pipes after +# the given child process has exited. +sub readfrompipes_after_exit { my $jobstepidx = shift; + + # The fact that the child has exited allows some convenient + # simplifications: (1) all data must have already been written, so + # there's no need to wait for more once sysread returns 0; (2) the + # total amount of data available is bounded by the pipe buffer size, + # so it's safe to read everything into one string. + my $buf; + while (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) { + $jobstep[$jobstepidx]->{stderr_at} = time; + $jobstep[$jobstepidx]->{stderr} .= $buf; + } + if ($jobstep[$jobstepidx]->{stdout_r}) { + while (0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) { + $jobstep[$jobstepidx]->{stdout_captured} .= $buf; + } + } preprocess_stderr ($jobstepidx); map { @@ -1557,9 +1638,10 @@ sub create_output_collection import arvados import sys print (arvados.api("v1").collections(). - create(body={"manifest_text": sys.stdin.read()}). + create(body={"manifest_text": sys.stdin.read(), + "owner_uuid": sys.argv[2]}). execute(num_retries=int(sys.argv[1]))["portable_data_hash"]) -}, retry_count()); +}, retry_count(), $Job->{owner_uuid}); my $task_idx = -1; my $manifest_size = 0; @@ -1710,7 +1792,7 @@ sub log_writer_start($) $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', '--stream', - '--retries', '3', + '--retries', '6', '--filename', $logfilename, '-'); $log_pipe_out_buf = ""; @@ -1744,7 +1826,7 @@ sub log_writer_finish() close($log_pipe_in); my $logger_failed = 0; - my $read_result = log_writer_read_output(120); + my $read_result = log_writer_read_output(600); if ($read_result == -1) { $logger_failed = -1; Log (undef, "timed out reading from 'arv-put'"); @@ -1807,6 +1889,7 @@ sub croak my ($package, $file, $line) = caller; my $message = "@_ at $file line $line\n"; Log (undef, $message); + release_allocation(); freeze() if @jobstep_todo; create_output_collection() if @jobstep_todo; cleanup(); @@ -1847,6 +1930,8 @@ sub save_meta }); Log(undef, "log collection is " . $log_coll->{portable_data_hash}); $Job->update_attributes('log' => $log_coll->{portable_data_hash}); + + return $log_coll->{portable_data_hash}; } @@ -1908,7 +1993,6 @@ sub freezeunquote return $s; } - sub srun_sync { my $srunargs = shift; @@ -1925,6 +2009,8 @@ sub srun_sync my ($stdout_r, $stdout_w); pipe $stdout_r, $stdout_w or croak("pipe() failed: $!"); + my $started_srun = scalar time; + my $srunpid = fork(); if ($srunpid == 0) { @@ -1932,8 +2018,8 @@ sub srun_sync close($stdout_r); fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec fcntl($stdout_w, F_SETFL, 0) or croak($!); - open(STDERR, ">&", $stderr_w); - open(STDOUT, ">&", $stdout_w); + open(STDERR, ">&", $stderr_w) or croak ($!); + open(STDOUT, ">&", $stdout_w) or croak ($!); srun ($srunargs, $execargs, $opts, $stdin); exit (1); } @@ -1963,16 +2049,22 @@ sub srun_sync if (!$busy || ($latest_refresh + 2 < scalar time)) { check_refresh_wanted(); check_squeue(); + check_sinfo(); } if (!$busy) { select(undef, undef, undef, 0.1); } + if (($started_srun + $srun_sync_timeout) < scalar time) { + # Exceeded general timeout for "srun_sync" operations, likely + # means something got stuck on the remote node. + Log(undef, "srun_sync exceeded timeout, will fail."); + $main::please_freeze = 1; + } killem(keys %proc) if $main::please_freeze; } my $exited = $?; - 1 while readfrompipes(); - process_stderr_final ($jobstepidx); + readfrompipes_after_exit ($jobstepidx); Log (undef, "$label: exit ".exit_status_s($exited)); @@ -1987,7 +2079,7 @@ sub srun_sync if ($main::please_freeze || $j->{tempfail}) { $exited ||= 255; } - return ($exited, $j->{stdout_captured}, $j->{stderr_captured}); + return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail}); } @@ -2068,13 +2160,18 @@ sub find_docker_image { } } } - if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) { + if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) { return ($streamname, $1); } else { return (undef, undef); } } +sub exit_retry_unlocked { + Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED); + exit(EX_RETRY_UNLOCKED); +} + sub retry_count { # Calculate the number of times an operation should be retried, # assuming exponential backoff, and that we're willing to retry as @@ -2107,8 +2204,22 @@ sub retry_op { # that can be retried, the second function will be called with # the current try count (0-based), next try time, and error message. my $operation = shift; - my $retry_callback = shift; + my $op_text = shift; my $retries = retry_count(); + my $retry_callback = sub { + my ($try_count, $next_try_at, $errmsg) = @_; + $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; + $errmsg =~ s/\s/ /g; + $errmsg =~ s/\s+$//; + my $retry_msg; + if ($next_try_at < time) { + $retry_msg = "Retrying."; + } else { + my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); + $retry_msg = "Retrying at $next_try_fmt."; + } + Log(undef, "$op_text failed: $errmsg. $retry_msg"); + }; foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); my $result = eval { $operation->(@_); }; @@ -2131,25 +2242,11 @@ sub api_call { # This function will call that method, retrying as needed until # the current retry_count is exhausted, with a log on the first failure. my $method_name = shift; - my $log_api_retry = sub { - my ($try_count, $next_try_at, $errmsg) = @_; - $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; - $errmsg =~ s/\s/ /g; - $errmsg =~ s/\s+$//; - my $retry_msg; - if ($next_try_at < time) { - $retry_msg = "Retrying."; - } else { - my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); - $retry_msg = "Retrying at $next_try_fmt."; - } - Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); - }; my $method = $arv; foreach my $key (split(/\//, $method_name)) { $method = $method->{$key}; } - return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); + return retry_op(sub { $method->execute(@_); }, "API method $method_name", @_); } sub exit_status_s { @@ -2423,8 +2520,8 @@ if ((-d $python_dir) and can_run("python2.7")) { # Hide messages from the install script (unless it fails: shell_or_die # will show $destdir.log in that case). -open(STDOUT, ">>", "$destdir.log"); -open(STDERR, ">&", STDOUT); +open(STDOUT, ">>", "$destdir.log") or die ($!); +open(STDERR, ">&", STDOUT) or die ($!); if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir); @@ -2445,7 +2542,7 @@ close L; sub can_run { my $command_name = shift; - open(my $which, "-|", "which", $command_name); + open(my $which, "-|", "which", $command_name) or die ($!); while (<$which>) { } close($which); return ($? == 0);