X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/86ddb15b162a0c29b6caa4370e5cedf27bbdd69a..d084ca24b06c598271844d2ba4c8c40f251c0999:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index b2c14d5b71..afca52cb73 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -1,4 +1,8 @@ #!/usr/bin/env perl +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- =head1 NAME @@ -126,14 +130,18 @@ my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; +my $cgroup_root = "/sys/fs/cgroup"; my $docker_bin = "docker.io"; +my $docker_run_args = ""; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, 'job-api-token=s' => \$job_api_token, 'no-clear-tmp' => \$no_clear_tmp, 'resume-stash=s' => \$resume_stash, + 'cgroup-root=s' => \$cgroup_root, 'docker-bin=s' => \$docker_bin, + 'docker-run-args=s' => \$docker_run_args, ); if (defined $job_api_token) { @@ -181,11 +189,12 @@ if (($Job || $local_job)->{docker_image_locator}) { $cmd = [$docker_bin, 'ps', '-q']; } Log(undef, "Sanity check is `@$cmd`"); -srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], - $cmd, - {fork => 1}); -if ($? != 0) { - Log(undef, "Sanity check failed: ".exit_status_s($?)); +my ($exited, $stdout, $stderr) = srun_sync( + ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], + $cmd, + {label => "sanity check"}); +if ($exited != 0) { + Log(undef, "Sanity check failed: ".exit_status_s($exited)); exit EX_TEMPFAIL; } Log(undef, "Sanity check OK"); @@ -350,6 +359,7 @@ my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; my $squeue_checked = 0; +my $sinfo_checked = 0; my $latest_refresh = scalar time; @@ -384,28 +394,17 @@ my $nodelist = join(",", @node); my $git_tar_count = 0; if (!defined $no_clear_tmp) { - # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* - Log (undef, "Clean work dirs"); - - my $cleanpid = fork(); - if ($cleanpid == 0) - { - # Find FUSE mounts under $CRUNCH_TMP and unmount them. - # Then clean up work directories. - # TODO: When #5036 is done and widely deployed, we can limit mount's - # -t option to simply fuse.keep. - srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']); - exit (1); - } - while (1) - { - last if $cleanpid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($cleanpid); - select (undef, undef, undef, 0.1); - } - if ($?) { - Log(undef, "Clean work dirs: exit ".exit_status_s($?)); + # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean + # up work directories crunch_tmp/work, crunch_tmp/opt, + # crunch_tmp/src*. + my ($exited, $stdout, $stderr) = srun_sync( + ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], + ['bash', '-ec', q{ +arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP} +rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid + }], + {label => "clean work dirs"}); + if ($exited != 0) { exit(EX_RETRY_UNLOCKED); } } @@ -413,41 +412,48 @@ if (!defined $no_clear_tmp) { # If this job requires a Docker image, install that. my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg); if ($docker_locator = $Job->{docker_image_locator}) { + Log (undef, "Install docker image $docker_locator"); ($docker_stream, $docker_hash) = find_docker_image($docker_locator); if (!$docker_hash) { croak("No Docker image hash found from locator $docker_locator"); } + Log (undef, "docker image hash is $docker_hash"); $docker_stream =~ s/^\.//; my $docker_install_script = qq{ -if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then - arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +loaded() { + id=\$($docker_bin inspect --format="{{.ID}}" \Q$docker_hash\E) || return 1 + echo "image ID is \$id" + [[ \${id} = \Q$docker_hash\E ]] +} +if loaded >&2 2>/dev/null; then + echo >&2 "image is already present" + exit 0 fi +echo >&2 "docker image is not present; loading" +arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load +if ! loaded >&2; then + echo >&2 "`docker load` exited 0, but image is not found (!)" + exit 1 +fi +echo >&2 "image loaded successfully" }; - my $docker_pid = fork(); - if ($docker_pid == 0) - { - srun (["srun", "--nodelist=" . join(',', @node)], - ["/bin/sh", "-ec", $docker_install_script]); - exit ($?); - } - while (1) - { - last if $docker_pid == waitpid (-1, WNOHANG); - freeze_if_want_freeze ($docker_pid); - select (undef, undef, undef, 0.1); - } - if ($? != 0) + + my ($exited, $stdout, $stderr) = srun_sync( + ["srun", "--nodelist=" . join(',', @node)], + ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script], + {label => "load docker image"}); + if ($exited != 0) { - croak("Installing Docker image from $docker_locator exited " - .exit_status_s($?)); + exit(EX_RETRY_UNLOCKED); } # Determine whether this version of Docker supports memory+swap limits. - srun(["srun", "--nodelist=" . $node[0]], - ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="], - {fork => 1}); - $docker_limitmem = ($? == 0); + ($exited, $stdout, $stderr) = srun_sync( + ["srun", "--nodes=1"], + [$docker_bin, 'run', '--help'], + {label => "check --memory-swap feature"}); + $docker_limitmem = ($stdout =~ /--memory-swap/); # Find a non-root Docker user to use. # Tries the default user for the container, then 'crunch', then 'nobody', @@ -457,20 +463,22 @@ fi # Docker containers. my @tryusers = ("", "crunch", "nobody"); foreach my $try_user (@tryusers) { + my $label; my $try_user_arg; if ($try_user eq "") { - Log(undef, "Checking if container default user is not UID 0"); + $label = "check whether default user is UID 0"; $try_user_arg = ""; } else { - Log(undef, "Checking if user '$try_user' is not UID 0"); + $label = "check whether user '$try_user' is UID 0"; $try_user_arg = "--user=$try_user"; } - srun(["srun", "--nodelist=" . $node[0]], - ["/bin/sh", "-ec", - "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " . - " test \$a -ne 0"], - {fork => 1}); - if ($? == 0) { + my ($exited, $stdout, $stderr) = srun_sync( + ["srun", "--nodes=1"], + ["/bin/sh", "-ec", + "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"], + {label => $label}); + chomp($stdout); + if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) { $dockeruserarg = $try_user_arg; if ($try_user eq "") { Log(undef, "Container will run with default user"); @@ -660,11 +668,9 @@ if (!defined $git_archive) { } } else { - my $install_exited; + my $exited; my $install_script_tries_left = 3; for (my $attempts = 0; $attempts < 3; $attempts++) { - Log(undef, "Run install script on all workers"); - my @srunargs = ("srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); @@ -672,59 +678,21 @@ else { "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); - my ($install_stderr_r, $install_stderr_w); - pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!"); - set_nonblocking($install_stderr_r); - my $installpid = fork(); - if ($installpid == 0) - { - close($install_stderr_r); - fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec - open(STDOUT, ">&", $install_stderr_w); - open(STDERR, ">&", $install_stderr_w); - srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); - exit (1); - } - close($install_stderr_w); - # Tell freeze_if_want_freeze how to kill the child, otherwise the - # "waitpid(installpid)" loop won't get interrupted by a freeze: - $proc{$installpid} = {}; - my $stderr_buf = ''; - # Track whether anything appears on stderr other than slurm errors - # ("srun: ...") and the "starting: ..." message printed by the - # srun subroutine itself: + my ($stdout, $stderr); + ($exited, $stdout, $stderr) = srun_sync( + \@srunargs, \@execargs, + {label => "run install script on all workers"}, + $build_script . $git_archive); + my $stderr_anything_from_script = 0; - my $match_our_own_errors = '^(srun: error: |starting: \[)'; - while ($installpid != waitpid(-1, WNOHANG)) { - freeze_if_want_freeze ($installpid); - # Wait up to 0.1 seconds for something to appear on stderr, then - # do a non-blocking read. - my $bits = fhbits($install_stderr_r); - select ($bits, undef, $bits, 0.1); - if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf))) - { - while ($stderr_buf =~ /^(.*?)\n/) { - my $line = $1; - substr $stderr_buf, 0, 1+length($line), ""; - Log(undef, "stderr $line"); - if ($line !~ /$match_our_own_errors/) { - $stderr_anything_from_script = 1; - } - } - } - } - delete $proc{$installpid}; - $install_exited = $?; - close($install_stderr_r); - if (length($stderr_buf) > 0) { - if ($stderr_buf !~ /$match_our_own_errors/) { + for my $line (split(/\n/, $stderr)) { + if ($line !~ /^(srun: error: |starting: \[)/) { $stderr_anything_from_script = 1; } - Log(undef, "stderr $stderr_buf") } - Log (undef, "Install script exited ".exit_status_s($install_exited)); - last if $install_exited == 0 || $main::please_freeze; + last if $exited == 0 || $main::please_freeze; + # If the install script fails but doesn't print an error message, # the next thing anyone is likely to do is just run it again in # case it was a transient problem like "slurm communication fails @@ -740,7 +708,7 @@ else { unlink($tar_filename); } - if ($install_exited != 0) { + if ($exited != 0) { croak("Giving up"); } } @@ -755,6 +723,15 @@ foreach (split (/\n/, $Job->{knobs})) { Log (undef, "knob " . $_); } +my $resp = api_call( + 'nodes/list', + 'filters' => [['hostname', 'in', \@node]], + 'order' => 'hostname', + 'limit' => scalar(@node), + ); +for my $n (@{$resp->{items}}) { + Log(undef, "$n->{hostname} $n->{uuid} ".JSON::encode_json($n->{properties})); +} @@ -799,6 +776,7 @@ if ($initial_tasks_this_level < @node) { @freeslot = (0..$#slot); } my $round_num_freeslots = scalar(@freeslot); +print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n"; my %round_max_slots = (); for (my $ii = $#freeslot; $ii >= 0; $ii--) { @@ -870,11 +848,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname"; $ENV{"HOME"} = $ENV{"TASK_WORK"}; - $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}}; $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"}; + my $keep_mnt = $ENV{"TASK_WORK"}.".keep"; + $ENV{"GZIP"} = "-n"; my @srunargs = ( @@ -892,22 +871,30 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } my $command = - "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " - ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " - ."&& cd $ENV{CRUNCH_TMP} " + "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; " + ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E " + ."&& cd \Q$ENV{CRUNCH_TMP}\E " # These environment variables get used explicitly later in # $command. No tool is expected to read these values directly. .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' /dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi } + .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; } + .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi }; + + $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec "; + $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh"; + $ENV{TASK_KEEPMOUNT_TMP} = "$keep_mnt/tmp"; + if ($docker_hash) { my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}"; my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid"; - $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; - $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy "; + $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; + $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy "; # We only set memory limits if Docker lets us limit both memory and swap. # Memory limits alone have been supported longer, but subprocesses tend # to get SIGKILL if they exceed that without any swap limit set. @@ -922,14 +909,18 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E "; $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E "; - # Currently, we make arv-mount's mount point appear at /keep - # inside the container (instead of using the same path as the - # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However, - # crunch scripts and utilities must not rely on this. They must - # use $TASK_KEEPMOUNT. + # Currently, we make the "by_pdh" directory in arv-mount's mount + # point appear at /keep inside the container (instead of using + # the same path as the host like we do with CRUNCH_SRC and + # CRUNCH_INSTALL). However, crunch scripts and utilities must + # not rely on this. They must use $TASK_KEEPMOUNT. $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E "; $ENV{TASK_KEEPMOUNT} = "/keep"; + # Ditto TASK_KEEPMOUNT_TMP, as /keep_tmp. + $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT_TMP}:/keep_tmp\E "; + $ENV{TASK_KEEPMOUNT_TMP} = "/keep_tmp"; + # TASK_WORK is almost exactly like a docker data volume: it # starts out empty, is writable, and persists until no # containers use it any more. We don't use --volumes-from to @@ -957,6 +948,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) # For now, use the same approach as TASK_WORK above. $ENV{"JOB_WORK"} = "/tmp/crunch-job-work"; + # Bind mount the crunchrunner binary and host TLS certificates file into + # the container. + $command .= '"${VOLUMES[@]}" '; + while (my ($env_key, $env_val) = each %ENV) { if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) { @@ -982,7 +977,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } } else { # Non-docker run - $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; + $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 "; $command .= $stdbuf; $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } @@ -1000,11 +995,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) next; } shift @freeslot; - $proc{$childpid} = { jobstep => $id, - time => time, - slot => $childslot, - jobstepname => "$job_id.$id.$childpid", - }; + $proc{$childpid} = { + jobstepidx => $id, + time => time, + slot => $childslot, + jobstepname => "$job_id.$id.$childpid", + }; croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid}; $slot[$childslot]->{pid} = $childpid; @@ -1046,12 +1042,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) check_refresh_wanted(); check_squeue(); update_progress_stats(); - select (undef, undef, undef, 0.1); } elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty) { update_progress_stats(); } + if (!$gotsome) { + select (undef, undef, undef, 0.1); + } $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 && $_->{node}->{hold_count} < 4 } @slot); if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || @@ -1136,10 +1134,10 @@ freeze(); my $collated_output = save_output_collection(); Log (undef, "finish"); -save_meta(); +my $final_log = save_meta(); my $final_state; -if ($collated_output && $main::success) { +if ($collated_output && $final_log && $main::success) { $final_state = 'Complete'; } else { $final_state = 'Failed'; @@ -1170,109 +1168,119 @@ sub update_progress_stats sub reapchildren { - my $pid = waitpid (-1, WNOHANG); - return 0 if $pid <= 0; - - my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} - . "." - . $slot[$proc{$pid}->{slot}]->{cpu}); - my $jobstepid = $proc{$pid}->{jobstep}; - my $elapsed = time - $proc{$pid}->{time}; - my $Jobstep = $jobstep[$jobstepid]; - - my $childstatus = $?; - my $exitvalue = $childstatus >> 8; - my $exitinfo = "exit ".exit_status_s($childstatus); - $Jobstep->{'arvados_task'}->reload; - my $task_success = $Jobstep->{'arvados_task'}->{success}; - - Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success"); - - if (!defined $task_success) { - # task did not indicate one way or the other --> fail - Log($jobstepid, sprintf( - "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.", - exit_status_s($childstatus))); - $Jobstep->{'arvados_task'}->{success} = 0; - $Jobstep->{'arvados_task'}->save; - $task_success = 0; - } + my $children_reaped = 0; + my @successful_task_uuids = (); - if (!$task_success) + while((my $pid = waitpid (-1, WNOHANG)) > 0) { - my $temporary_fail; - $temporary_fail ||= $Jobstep->{tempfail}; - $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); - - ++$thisround_failed; - ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; - - # Check for signs of a failed or misconfigured node - if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= - 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { - # Don't count this against jobstep failure thresholds if this - # node is already suspected faulty and srun exited quickly - if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && - $elapsed < 5) { - Log ($jobstepid, "blaming failure on suspect node " . - $slot[$proc{$pid}->{slot}]->{node}->{name}); - $temporary_fail ||= 1; - } - ban_node_by_slot($proc{$pid}->{slot}); + my $childstatus = $?; + + my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} + . "." + . $slot[$proc{$pid}->{slot}]->{cpu}); + my $jobstepidx = $proc{$pid}->{jobstepidx}; + + $children_reaped++; + my $elapsed = time - $proc{$pid}->{time}; + my $Jobstep = $jobstep[$jobstepidx]; + + my $exitvalue = $childstatus >> 8; + my $exitinfo = "exit ".exit_status_s($childstatus); + $Jobstep->{'arvados_task'}->reload; + my $task_success = $Jobstep->{'arvados_task'}->{success}; + + Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success"); + + if (!defined $task_success) { + # task did not indicate one way or the other --> fail + Log($jobstepidx, sprintf( + "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", + exit_status_s($childstatus))); + $Jobstep->{'arvados_task'}->{success} = 0; + $Jobstep->{'arvados_task'}->save; + $task_success = 0; } - Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds', - ++$Jobstep->{'failures'}, - $temporary_fail ? 'temporary' : 'permanent', - $elapsed)); + if (!$task_success) + { + my $temporary_fail; + $temporary_fail ||= $Jobstep->{tempfail}; + $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); + + ++$thisround_failed; + ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; + + # Check for signs of a failed or misconfigured node + if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= + 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { + # Don't count this against jobstep failure thresholds if this + # node is already suspected faulty and srun exited quickly + if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && + $elapsed < 5) { + Log ($jobstepidx, "blaming failure on suspect node " . + $slot[$proc{$pid}->{slot}]->{node}->{name}); + $temporary_fail ||= 1; + } + ban_node_by_slot($proc{$pid}->{slot}); + } - if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { - # Give up on this task, and the whole job - $main::success = 0; + Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds', + ++$Jobstep->{'failures'}, + $temporary_fail ? 'temporary' : 'permanent', + $elapsed)); + + if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { + # Give up on this task, and the whole job + $main::success = 0; + } + # Put this task back on the todo queue + push @jobstep_todo, $jobstepidx; + $Job->{'tasks_summary'}->{'failed'}++; } - # Put this task back on the todo queue - push @jobstep_todo, $jobstepid; - $Job->{'tasks_summary'}->{'failed'}++; + else # task_success + { + push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid}; + ++$thisround_succeeded; + $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; + push @jobstep_done, $jobstepidx; + Log ($jobstepidx, "success in $elapsed seconds"); + } + $Jobstep->{exitcode} = $childstatus; + $Jobstep->{finishtime} = time; + $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); + $Jobstep->{'arvados_task'}->save; + process_stderr_final ($jobstepidx); + Log ($jobstepidx, sprintf("task output (%d bytes): %s", + length($Jobstep->{'arvados_task'}->{output}), + $Jobstep->{'arvados_task'}->{output})); + + close $reader{$jobstepidx}; + delete $reader{$jobstepidx}; + delete $slot[$proc{$pid}->{slot}]->{pid}; + push @freeslot, $proc{$pid}->{slot}; + delete $proc{$pid}; + + $progress_is_dirty = 1; } - else + + if (scalar(@successful_task_uuids) > 0) { - ++$thisround_succeeded; - $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; - $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; - $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; - push @jobstep_done, $jobstepid; - Log ($jobstepid, "success in $elapsed seconds"); - } - $Jobstep->{exitcode} = $childstatus; - $Jobstep->{finishtime} = time; - $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); - $Jobstep->{'arvados_task'}->save; - process_stderr ($jobstepid, $task_success); - Log ($jobstepid, sprintf("task output (%d bytes): %s", - length($Jobstep->{'arvados_task'}->{output}), - $Jobstep->{'arvados_task'}->{output})); - - close $reader{$jobstepid}; - delete $reader{$jobstepid}; - delete $slot[$proc{$pid}->{slot}]->{pid}; - push @freeslot, $proc{$pid}->{slot}; - delete $proc{$pid}; - - if ($task_success) { + Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids))); # Load new tasks my $newtask_list = []; my $newtask_results; do { $newtask_results = api_call( "job_tasks/list", - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, + 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]], 'order' => 'qsequence', 'offset' => scalar(@$newtask_list), - ); + ); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); + Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list))); foreach my $arvados_task (@$newtask_list) { my $jobstep = { 'level' => $arvados_task->{'sequence'}, @@ -1284,14 +1292,16 @@ sub reapchildren } } - $progress_is_dirty = 1; - 1; + return $children_reaped; } sub check_refresh_wanted { my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; - if (@stat && $stat[9] > $latest_refresh) { + if (@stat && + $stat[9] > $latest_refresh && + # ...and we have actually locked the job record... + $job_id eq $Job->{'uuid'}) { $latest_refresh = scalar time; my $Job2 = api_call("jobs/get", uuid => $jobspec); for my $attr ('cancelled_at', @@ -1329,9 +1339,13 @@ sub check_squeue # squeue check interval (15s) this should make the squeue check an # infrequent event. my $silent_procs = 0; - for my $jobstep (values %proc) + for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc) { - if ($jobstep->{stderr_at} < $last_squeue_check) + if (!exists($js->{stderr_at})) + { + $js->{stderr_at} = 0; + } + if ($js->{stderr_at} < $last_squeue_check) { $silent_procs++; } @@ -1339,17 +1353,18 @@ sub check_squeue return if $silent_procs == 0; # use killem() on procs whose killtime is reached - while (my ($pid, $jobstep) = each %proc) + while (my ($pid, $procinfo) = each %proc) { - if (exists $jobstep->{killtime} - && $jobstep->{killtime} <= time - && $jobstep->{stderr_at} < $last_squeue_check) + my $js = $jobstep[$procinfo->{jobstepidx}]; + if (exists $procinfo->{killtime} + && $procinfo->{killtime} <= time + && $js->{stderr_at} < $last_squeue_check) { my $sincewhen = ""; - if ($jobstep->{stderr_at}) { - $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s"; + if ($js->{stderr_at}) { + $sincewhen = " in last " . (time - $js->{stderr_at}) . "s"; } - Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); + Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); killem ($pid); } } @@ -1384,12 +1399,12 @@ sub check_squeue } # Check for child procs >60s old and not mentioned by squeue. - while (my ($pid, $jobstep) = each %proc) + while (my ($pid, $procinfo) = each %proc) { - if ($jobstep->{time} < time - 60 - && $jobstep->{jobstepname} - && !exists $ok{$jobstep->{jobstepname}} - && !exists $jobstep->{killtime}) + if ($procinfo->{time} < time - 60 + && $procinfo->{jobstepname} + && !exists $ok{$procinfo->{jobstepname}} + && !exists $procinfo->{killtime}) { # According to slurm, this task has ended (successfully or not) # -- but our srun child hasn't exited. First we must wait (30 @@ -1398,12 +1413,43 @@ sub check_squeue # terminated, we'll conclude some slurm communication # error/delay has caused the task to die without notifying srun, # and we'll kill srun ourselves. - $jobstep->{killtime} = time + 30; - Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited"); + $procinfo->{killtime} = time + 30; + Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited"); } } } +sub check_sinfo +{ + # If a node fails in a multi-node "srun" call during job setup, the call + # may hang instead of exiting with a nonzero code. This function checks + # "sinfo" for the health of the nodes that were allocated and ensures that + # they are all still in the "alloc" state. If a node that is allocated to + # this job is not in "alloc" state, then set please_freeze. + # + # This is only called from srun_sync() for node configuration. If a + # node fails doing actual work, there are other recovery mechanisms. + + # Do not call `sinfo` more than once every 15 seconds. + return if $sinfo_checked > time - 15; + $sinfo_checked = time; + + # The output format "%t" means output node states. + my @sinfo = `sinfo --nodes=\Q$ENV{SLURM_NODELIST}\E --noheader -o "%t"`; + if ($? != 0) + { + Log(undef, "warning: sinfo exit status $? ($!)"); + return; + } + chop @sinfo; + + foreach (@sinfo) + { + if ($_ != "alloc" && $_ != "alloc*") { + $main::please_freeze = 1; + } + } +} sub release_allocation { @@ -1418,64 +1464,105 @@ sub release_allocation sub readfrompipes { my $gotsome = 0; - foreach my $job (keys %reader) + my %fd_job; + my $sel = IO::Select->new(); + foreach my $jobstepidx (keys %reader) + { + my $fd = $reader{$jobstepidx}; + $sel->add($fd); + $fd_job{$fd} = $jobstepidx; + + if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) { + $sel->add($stdout_fd); + $fd_job{$stdout_fd} = $jobstepidx; + } + } + # select on all reader fds with 0.1s timeout + my @ready_fds = $sel->can_read(0.1); + foreach my $fd (@ready_fds) { my $buf; - while (0 < sysread ($reader{$job}, $buf, 8192)) + if (0 < sysread ($fd, $buf, 65536)) { + $gotsome = 1; print STDERR $buf if $ENV{CRUNCH_DEBUG}; - $jobstep[$job]->{stderr_at} = time; - $jobstep[$job]->{stderr} .= $buf; - preprocess_stderr ($job); - if (length ($jobstep[$job]->{stderr}) > 16384) + + my $jobstepidx = $fd_job{$fd}; + if ($jobstep[$jobstepidx]->{stdout_r} == $fd) { + $jobstep[$jobstepidx]->{stdout_captured} .= $buf; + next; + } + + $jobstep[$jobstepidx]->{stderr_at} = time; + $jobstep[$jobstepidx]->{stderr} .= $buf; + + # Consume everything up to the last \n + preprocess_stderr ($jobstepidx); + + if (length ($jobstep[$jobstepidx]->{stderr}) > 16384) { - substr ($jobstep[$job]->{stderr}, 0, 8192) = ""; + # If we get a lot of stderr without a newline, chop off the + # front to avoid letting our buffer grow indefinitely. + substr ($jobstep[$jobstepidx]->{stderr}, + 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = ""; } - $gotsome = 1; } } return $gotsome; } +# Consume all full lines of stderr for a jobstep. Everything after the +# last newline will remain in $jobstep[$jobstepidx]->{stderr} after +# returning. sub preprocess_stderr { - my $job = shift; + my $jobstepidx = shift; + # slotindex is only defined for children running Arvados job tasks. + # Be prepared to handle the undef case (for setup srun calls, etc.). + my $job_slot_index = $jobstep[$jobstepidx]->{slotindex}; - while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) { + while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) { my $line = $1; - substr $jobstep[$job]->{stderr}, 0, 1+length($line), ""; - Log ($job, "stderr $line"); - if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) { - # whoa. + substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), ""; + Log ($jobstepidx, "stderr $line"); + if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) { + # If the allocation is revoked, we can't possibly continue, so mark all + # nodes as failed. This will cause the overall exit code to be + # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run + # this job. $main::please_freeze = 1; + foreach my $st (@slot) { + $st->{node}->{fail_count}++; + } } - elsif ($line =~ /srun: error: Node failure on/) { - my $job_slot_index = $jobstep[$job]->{slotindex}; - $slot[$job_slot_index]->{node}->{fail_count}++; - $jobstep[$job]->{tempfail} = 1; - ban_node_by_slot($job_slot_index); + elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) { + $jobstep[$jobstepidx]->{tempfail} = 1; + if (defined($job_slot_index)) { + $slot[$job_slot_index]->{node}->{fail_count}++; + ban_node_by_slot($job_slot_index); + } } - elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) { - $jobstep[$job]->{tempfail} = 1; - ban_node_by_slot($jobstep[$job]->{slotindex}); + elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) { + $jobstep[$jobstepidx]->{tempfail} = 1; + ban_node_by_slot($job_slot_index) if (defined($job_slot_index)); } - elsif ($line =~ /arvados\.errors\.Keep/) { - $jobstep[$job]->{tempfail} = 1; + elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) { + $jobstep[$jobstepidx]->{tempfail} = 1; } } } -sub process_stderr +sub process_stderr_final { - my $job = shift; - my $task_success = shift; - preprocess_stderr ($job); + my $jobstepidx = shift; + preprocess_stderr ($jobstepidx); map { - Log ($job, "stderr $_"); - } split ("\n", $jobstep[$job]->{stderr}); + Log ($jobstepidx, "stderr $_"); + } split ("\n", $jobstep[$jobstepidx]->{stderr}); + $jobstep[$jobstepidx]->{stderr} = ''; } sub fetch_block @@ -1523,9 +1610,10 @@ sub create_output_collection import arvados import sys print (arvados.api("v1").collections(). - create(body={"manifest_text": sys.stdin.read()}). + create(body={"manifest_text": sys.stdin.read(), + "owner_uuid": sys.argv[2]}). execute(num_retries=int(sys.argv[1]))["portable_data_hash"]) -}, retry_count()); +}, retry_count(), $Job->{owner_uuid}); my $task_idx = -1; my $manifest_size = 0; @@ -1613,7 +1701,7 @@ sub killem } if (!exists $proc{$_}->{"sent_$sig"}) { - Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_"); + Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_"); kill $sig, $_; select (undef, undef, undef, 0.1); if ($sig == 2) @@ -1676,7 +1764,7 @@ sub log_writer_start($) $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', '--stream', - '--retries', '3', + '--retries', '6', '--filename', $logfilename, '-'); $log_pipe_out_buf = ""; @@ -1710,7 +1798,7 @@ sub log_writer_finish() close($log_pipe_in); my $logger_failed = 0; - my $read_result = log_writer_read_output(120); + my $read_result = log_writer_read_output(600); if ($read_result == -1) { $logger_failed = -1; Log (undef, "timed out reading from 'arv-put'"); @@ -1737,16 +1825,21 @@ sub log_writer_is_active() { return $log_pipe_pid; } -sub Log # ($jobstep_id, $logmessage) +sub Log # ($jobstepidx, $logmessage) { - if ($_[1] =~ /\n/) { + my ($jobstepidx, $logmessage) = @_; + if ($logmessage =~ /\n/) { for my $line (split (/\n/, $_[1])) { - Log ($_[0], $line); + Log ($jobstepidx, $line); } return; } my $fh = select STDERR; $|=1; select $fh; - my $message = sprintf ("%s %d %s %s", $job_id, $$, @_); + my $task_qseq = ''; + if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) { + $task_qseq = $jobstepidx; + } + my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage); $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; @@ -1768,6 +1861,7 @@ sub croak my ($package, $file, $line) = caller; my $message = "@_ at $file line $line\n"; Log (undef, $message); + release_allocation(); freeze() if @jobstep_todo; create_output_collection() if @jobstep_todo; cleanup(); @@ -1808,6 +1902,8 @@ sub save_meta }); Log(undef, "log collection is " . $log_coll->{portable_data_hash}); $Job->update_attributes('log' => $log_coll->{portable_data_hash}); + + return $log_coll->{portable_data_hash}; } @@ -1869,6 +1965,88 @@ sub freezeunquote return $s; } +sub srun_sync +{ + my $srunargs = shift; + my $execargs = shift; + my $opts = shift || {}; + my $stdin = shift; + + my $label = exists $opts->{label} ? $opts->{label} : "@$execargs"; + Log (undef, "$label: start"); + + my ($stderr_r, $stderr_w); + pipe $stderr_r, $stderr_w or croak("pipe() failed: $!"); + + my ($stdout_r, $stdout_w); + pipe $stdout_r, $stdout_w or croak("pipe() failed: $!"); + + my $srunpid = fork(); + if ($srunpid == 0) + { + close($stderr_r); + close($stdout_r); + fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec + fcntl($stdout_w, F_SETFL, 0) or croak($!); + open(STDERR, ">&", $stderr_w); + open(STDOUT, ">&", $stdout_w); + srun ($srunargs, $execargs, $opts, $stdin); + exit (1); + } + close($stderr_w); + close($stdout_w); + + set_nonblocking($stderr_r); + set_nonblocking($stdout_r); + + # Add entries to @jobstep and %proc so check_squeue() and + # freeze_if_want_freeze() can treat it like a job task process. + push @jobstep, { + stderr => '', + stderr_at => 0, + stderr_captured => '', + stdout_r => $stdout_r, + stdout_captured => '', + }; + my $jobstepidx = $#jobstep; + $proc{$srunpid} = { + jobstepidx => $jobstepidx, + }; + $reader{$jobstepidx} = $stderr_r; + + while ($srunpid != waitpid ($srunpid, WNOHANG)) { + my $busy = readfrompipes(); + if (!$busy || ($latest_refresh + 2 < scalar time)) { + check_refresh_wanted(); + check_squeue(); + check_sinfo(); + } + if (!$busy) { + select(undef, undef, undef, 0.1); + } + killem(keys %proc) if $main::please_freeze; + } + my $exited = $?; + + 1 while readfrompipes(); + process_stderr_final ($jobstepidx); + + Log (undef, "$label: exit ".exit_status_s($exited)); + + close($stdout_r); + close($stderr_r); + delete $proc{$srunpid}; + delete $reader{$jobstepidx}; + + my $j = pop @jobstep; + # If the srun showed signs of tempfail, ensure the caller treats that as a + # failure case. + if ($main::please_freeze || $j->{tempfail}) { + $exited ||= 255; + } + return ($exited, $j->{stdout_captured}, $j->{stderr_captured}); +} + sub srun { @@ -1947,7 +2125,7 @@ sub find_docker_image { } } } - if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) { + if (defined($filename) and ($filename =~ /^((?:sha256:)?[0-9A-Fa-f]{64})\.tar$/)) { return ($streamname, $1); } else { return (undef, undef);