X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b405f0f487f35f62d8362dc06981b83176b77d44..57647945b08a74bab02fed6adf947eb9adb8321f:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index a2e42d6fac..5e6c3a084e 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -1,4 +1,8 @@ #!/usr/bin/env perl +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- =head1 NAME @@ -185,7 +189,7 @@ if (($Job || $local_job)->{docker_image_locator}) { $cmd = [$docker_bin, 'ps', '-q']; } Log(undef, "Sanity check is `@$cmd`"); -my ($exited, $stdout, $stderr) = srun_sync( +my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], $cmd, {label => "sanity check"}); @@ -393,15 +397,15 @@ if (!defined $no_clear_tmp) { # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean # up work directories crunch_tmp/work, crunch_tmp/opt, # crunch_tmp/src*. - # - # TODO: When #5036 is done and widely deployed, we can limit mount's - # -t option to simply fuse.keep. - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'], + ['bash', '-ec', q{ +arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP} +rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid + }], {label => "clean work dirs"}); if ($exited != 0) { - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } } @@ -417,34 +421,41 @@ if ($docker_locator = $Job->{docker_image_locator}) { Log (undef, "docker image hash is $docker_hash"); $docker_stream =~ s/^\.//; my $docker_install_script = qq{ -if $docker_bin images -q --no-trunc --all | grep -xF \Q$docker_hash\E >/dev/null; then - exit 0 +loaded() { + id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1 + echo "image ID is \$id" + [[ \${id} = \Q$docker_hash\E ]] +} +if loaded >&2 2>/dev/null; then + echo >&2 "image is already present" + exit 0 fi -declare -a exit_codes=("\${PIPESTATUS[@]}") -if [ 0 != "\${exit_codes[0]}" ]; then - exit "\${exit_codes[0]}" # `docker images` failed -elif [ 1 != "\${exit_codes[1]}" ]; then - exit "\${exit_codes[1]}" # `grep` encountered an error -else - # Everything worked fine, but grep didn't find the image on this host. - arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +echo >&2 "docker image is not present; loading" +arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +if ! loaded >&2; then + echo >&2 "`docker load` exited 0, but image is not found (!)" + exit 1 fi +echo >&2 "image loaded successfully" }; - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodelist=" . join(',', @node)], ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script], {label => "load docker image"}); if ($exited != 0) { - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } # Determine whether this version of Docker supports memory+swap limits. - ($exited, $stdout, $stderr) = srun_sync( + ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=1"], [$docker_bin, 'run', '--help'], {label => "check --memory-swap feature"}); + if ($tempfail) { + exit_retry_unlocked(); + } $docker_limitmem = ($stdout =~ /--memory-swap/); # Find a non-root Docker user to use. @@ -464,7 +475,7 @@ fi $label = "check whether user '$try_user' is UID 0"; $try_user_arg = "--user=$try_user"; } - my ($exited, $stdout, $stderr) = srun_sync( + my ($exited, $stdout, $stderr, $tempfail) = srun_sync( ["srun", "--nodes=1"], ["/bin/sh", "-ec", "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"], @@ -478,6 +489,8 @@ fi Log(undef, "Container will run with $dockeruserarg"); } last; + } elsif ($tempfail) { + exit_retry_unlocked(); } } @@ -670,11 +683,14 @@ else { "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); - my ($stdout, $stderr); - ($exited, $stdout, $stderr) = srun_sync( + my ($stdout, $stderr, $tempfail); + ($exited, $stdout, $stderr, $tempfail) = srun_sync( \@srunargs, \@execargs, {label => "run install script on all workers"}, - $build_script . $git_archive); + $build_script . $git_archive); + if ($tempfail) { + exit_retry_unlocked(); + } my $stderr_anything_from_script = 0; for my $line (split(/\n/, $stderr)) { @@ -715,6 +731,15 @@ foreach (split (/\n/, $Job->{knobs})) { Log (undef, "knob " . $_); } +my $resp = api_call( + 'nodes/list', + 'filters' => [['hostname', 'in', \@node]], + 'order' => 'hostname', + 'limit' => scalar(@node), + ); +for my $n (@{$resp->{items}}) { + Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties})); +} @@ -1100,7 +1125,7 @@ if (!defined $main::success) } elsif ($working_slot_count < 1) { save_output_collection(); save_meta(); - exit(EX_RETRY_UNLOCKED); + exit_retry_unlocked(); } elsif ($thisround_succeeded == 0 && ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; @@ -1117,10 +1142,10 @@ freeze(); my $collated_output = save_output_collection(); Log (undef, "finish"); -save_meta(); +my $final_log = save_meta(); my $final_state; -if ($collated_output && $main::success) { +if ($collated_output && $final_log && $main::success) { $final_state = 'Complete'; } else { $final_state = 'Failed'; @@ -1519,7 +1544,7 @@ sub preprocess_stderr $st->{node}->{fail_count}++; } } - elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) { + elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) { $jobstep[$jobstepidx]->{tempfail} = 1; if (defined($job_slot_index)) { $slot[$job_slot_index]->{node}->{fail_count}++; @@ -1593,9 +1618,10 @@ sub create_output_collection import arvados import sys print (arvados.api("v1").collections(). - create(body={"manifest_text": sys.stdin.read()}). + create(body={"manifest_text": sys.stdin.read(), + "owner_uuid": sys.argv[2]}). execute(num_retries=int(sys.argv[1]))["portable_data_hash"]) -}, retry_count()); +}, retry_count(), $Job->{owner_uuid}); my $task_idx = -1; my $manifest_size = 0; @@ -1746,7 +1772,7 @@ sub log_writer_start($) $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', '--stream', - '--retries', '3', + '--retries', '6', '--filename', $logfilename, '-'); $log_pipe_out_buf = ""; @@ -1843,6 +1869,7 @@ sub croak my ($package, $file, $line) = caller; my $message = "@_ at $file line $line\n"; Log (undef, $message); + release_allocation(); freeze() if @jobstep_todo; create_output_collection() if @jobstep_todo; cleanup(); @@ -1883,6 +1910,8 @@ sub save_meta }); Log(undef, "log collection is " . $log_coll->{portable_data_hash}); $Job->update_attributes('log' => $log_coll->{portable_data_hash}); + + return $log_coll->{portable_data_hash}; } @@ -2023,7 +2052,7 @@ sub srun_sync if ($main::please_freeze || $j->{tempfail}) { $exited ||= 255; } - return ($exited, $j->{stdout_captured}, $j->{stderr_captured}); + return ($exited, $j->{stdout_captured}, $j->{stderr_captured}, $j->{tempfail}); } @@ -2111,6 +2140,11 @@ sub find_docker_image { } } +sub exit_retry_unlocked { + Log(undef, "Transient failure with lock acquired; asking for re-dispatch by exiting ".EX_RETRY_UNLOCKED); + exit(EX_RETRY_UNLOCKED); +} + sub retry_count { # Calculate the number of times an operation should be retried, # assuming exponential backoff, and that we're willing to retry as