X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/de1e5fd5605aaf11b96ef411201e11ac767fe8ba..714c555bda26a6a27fad7caef382d1d6705ad215:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 786e18f246..28da66d0ca 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- =head1 NAME @@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree ); use constant TASK_TEMPFAIL => 111; use constant EX_TEMPFAIL => 75; +use constant EX_RETRY_UNLOCKED => 93; $ENV{"TMPDIR"} ||= "/tmp"; unless (defined $ENV{"CRUNCH_TMP"}) { @@ -125,7 +126,7 @@ my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; -my $docker_bin = "/usr/bin/docker.io"; +my $docker_bin = "docker.io"; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, @@ -168,8 +169,7 @@ if ($jobspec =~ /^[-a-z\d]+$/) } else { - $Job = JSON::decode_json($jobspec); - $local_job = 1; + $local_job = JSON::decode_json($jobspec); } @@ -177,7 +177,7 @@ else # at least able to run basic commands: they aren't down or severely # misconfigured. my $cmd = ['true']; -if ($Job->{docker_image_locator}) { +if (($Job || $local_job)->{docker_image_locator}) { $cmd = [$docker_bin, 'ps', '-q']; } Log(undef, "Sanity check is `@$cmd`"); @@ -207,15 +207,15 @@ else { if (!$resume_stash) { - map { croak ("No $_ specified") unless $Job->{$_} } + map { croak ("No $_ specified") unless $local_job->{$_} } qw(script script_version script_parameters); } - $Job->{'is_locked_by_uuid'} = $User->{'uuid'}; - $Job->{'started_at'} = gmtime; - $Job->{'state'} = 'Running'; + $local_job->{'is_locked_by_uuid'} = $User->{'uuid'}; + $local_job->{'started_at'} = gmtime; + $local_job->{'state'} = 'Running'; - $Job = api_call("jobs/create", job => $Job); + $Job = api_call("jobs/create", job => $local_job); } $job_id = $Job->{'uuid'}; @@ -292,9 +292,16 @@ foreach (@sinfo) { Log (undef, "node $nodename - $ncpus slots"); my $node = { name => $nodename, - ncpus => $ncpus, - losing_streak => 0, - hold_until => 0 }; + ncpus => $ncpus, + # The number of consecutive times a task has been dispatched + # to this node and failed. + losing_streak => 0, + # The number of consecutive times that SLURM has reported + # a node failure since the last successful task. + fail_count => 0, + # Don't dispatch work to this node until this time + # (in seconds since the epoch) has passed. + hold_until => 0 }; foreach my $cpu (1..$ncpus) { push @slot, { node => $node, @@ -383,12 +390,12 @@ if (!defined $no_clear_tmp) { my $cleanpid = fork(); if ($cleanpid == 0) { - # Find FUSE mounts that look like Keep mounts (the mount path has the - # word "keep") and unmount them. Then clean up work directories. - # TODO: When #5036 is done and widely deployed, we can get rid of the - # regular expression and just unmount everything with type fuse.keep. + # Find FUSE mounts under $CRUNCH_TMP and unmount them. + # Then clean up work directories. + # TODO: When #5036 is done and widely deployed, we can limit mount's + # -t option to simply fuse.keep. srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']); + ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']); exit (1); } while (1) @@ -397,11 +404,14 @@ if (!defined $no_clear_tmp) { freeze_if_want_freeze ($cleanpid); select (undef, undef, undef, 0.1); } - Log (undef, "Cleanup command exited ".exit_status_s($?)); + if ($?) { + Log(undef, "Clean work dirs: exit ".exit_status_s($?)); + exit(EX_RETRY_UNLOCKED); + } } # If this job requires a Docker image, install that. -my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem); +my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg); if ($docker_locator = $Job->{docker_image_locator}) { ($docker_stream, $docker_hash) = find_docker_image($docker_locator); if (!$docker_hash) @@ -439,6 +449,42 @@ fi {fork => 1}); $docker_limitmem = ($? == 0); + # Find a non-root Docker user to use. + # Tries the default user for the container, then 'crunch', then 'nobody', + # testing for whether the actual user id is non-zero. This defends against + # mistakes but not malice, but we intend to harden the security in the future + # so we don't want anyone getting used to their jobs running as root in their + # Docker containers. + my @tryusers = ("", "crunch", "nobody"); + foreach my $try_user (@tryusers) { + my $try_user_arg; + if ($try_user eq "") { + Log(undef, "Checking if container default user is not UID 0"); + $try_user_arg = ""; + } else { + Log(undef, "Checking if user '$try_user' is not UID 0"); + $try_user_arg = "--user=$try_user"; + } + srun(["srun", "--nodelist=" . $node[0]], + ["/bin/sh", "-ec", + "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " . + " test \$a -ne 0"], + {fork => 1}); + if ($? == 0) { + $dockeruserarg = $try_user_arg; + if ($try_user eq "") { + Log(undef, "Container will run with default user"); + } else { + Log(undef, "Container will run with $dockeruserarg"); + } + last; + } + } + + if (!defined $dockeruserarg) { + croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container."); + } + if ($Job->{arvados_sdk_version}) { # The job also specifies an Arvados SDK version. Add the SDKs to the # tar file for the build script to install. @@ -588,7 +634,7 @@ else { unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) { croak("`$gitcmd rev-list` exited " .exit_status_s($?) - .", '$treeish' not found. Giving up."); + .", '$treeish' not found, giving up"); } $commit = $1; Log(undef, "Version $treeish is commit $commit"); @@ -721,6 +767,7 @@ ONELEVEL: my $thisround_succeeded = 0; my $thisround_failed = 0; my $thisround_failed_multiple = 0; +my $working_slot_count = scalar(@slot); @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; @@ -773,6 +820,9 @@ update_progress_stats(); THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { + # Don't create new tasks if we already know the job's final result. + last if defined($main::success); + my $id = $jobstep_todo[$todo_ptr]; my $Jobstep = $jobstep[$id]; if ($Jobstep->{level} != $level) @@ -833,6 +883,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); + + my $stdbuf = " stdbuf --output=0 --error=0 "; + my $command = "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " @@ -843,12 +896,13 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' {"script"}; + + if ($Job->{arvados_sdk_version}) { + $command .= $stdbuf; + $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E"; + } else { + $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " . + "if which stdbuf >/dev/null ; then " . + " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" . + " else " . + " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" . + " fi\'"; + } } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; - $command .= "stdbuf --output=0 --error=0 "; + $command .= $stdbuf; $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } @@ -950,6 +1007,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $Jobstep->{slotindex} = $childslot; delete $Jobstep->{stderr}; delete $Jobstep->{finishtime}; + delete $Jobstep->{tempfail}; $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime}); $Jobstep->{'arvados_task'}->save; @@ -963,7 +1021,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) || ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { - last THISROUND if $main::please_freeze || defined($main::success); + last THISROUND if $main::please_freeze; if ($main::please_info) { $main::please_info = 0; @@ -975,17 +1033,19 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) my $gotsome = readfrompipes () + reapchildren (); - if (!$gotsome) + if (!$gotsome || ($latest_refresh + 2 < scalar time)) { check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); } - elsif (time - $progress_stats_updated >= 30) + elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty) { update_progress_stats(); } + $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 && + $_->{node}->{hold_count} < 4 } @slot); if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) { @@ -1009,10 +1069,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } # give up if no nodes are succeeding - if (!grep { $_->{node}->{losing_streak} == 0 && - $_->{node}->{hold_count} < 4 } @slot) { - my $message = "Every node has failed -- giving up on this round"; - Log (undef, $message); + if ($working_slot_count < 1) { + Log(undef, "Every node has failed -- giving up"); last THISROUND; } } @@ -1048,18 +1106,18 @@ freeze_if_want_freeze(); if (!defined $main::success) { - if (@jobstep_todo && - $thisround_succeeded == 0 && - ($thisround_failed == 0 || $thisround_failed > 4)) - { + if (!@jobstep_todo) { + $main::success = 1; + } elsif ($working_slot_count < 1) { + save_output_collection(); + save_meta(); + exit(EX_RETRY_UNLOCKED); + } elsif ($thisround_succeeded == 0 && + ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); $main::success = 0; } - if (!@jobstep_todo) - { - $main::success = 1; - } } goto ONELEVEL if !defined $main::success; @@ -1067,16 +1125,7 @@ goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); -my $collated_output = &create_output_collection(); - -if (!$collated_output) { - Log (undef, "Failed to write output collection"); -} -else { - Log(undef, "job output $collated_output"); - $Job->update_attributes('output' => $collated_output); -} - +my $collated_output = save_output_collection(); Log (undef, "finish"); save_meta(); @@ -1098,8 +1147,8 @@ sub update_progress_stats $progress_stats_updated = time; return if !$progress_is_dirty; my ($todo, $done, $running) = (scalar @jobstep_todo, - scalar @jobstep_done, - scalar @slot - scalar @freeslot - scalar @holdslot); + scalar @jobstep_done, + scalar keys(%proc)); $Job->{'tasks_summary'} ||= {}; $Job->{'tasks_summary'}->{'todo'} = $todo; $Job->{'tasks_summary'}->{'done'} = $done; @@ -1133,6 +1182,9 @@ sub reapchildren if (!defined $task_success) { # task did not indicate one way or the other --> fail + Log($jobstepid, sprintf( + "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.", + exit_status_s($childstatus))); $Jobstep->{'arvados_task'}->{success} = 0; $Jobstep->{'arvados_task'}->save; $task_success = 0; @@ -1141,7 +1193,7 @@ sub reapchildren if (!$task_success) { my $temporary_fail; - $temporary_fail ||= $Jobstep->{node_fail}; + $temporary_fail ||= $Jobstep->{tempfail}; $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); ++$thisround_failed; @@ -1179,6 +1231,7 @@ sub reapchildren ++$thisround_succeeded; $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; push @jobstep_done, $jobstepid; Log ($jobstepid, "success in $elapsed seconds"); } @@ -1299,8 +1352,15 @@ sub check_squeue return; } - # get a list of steps still running - my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%i %j' --noheader`; + # Get a list of steps still running. Note: squeue(1) says --steps + # selects a format (which we override anyway) and allows us to + # specify which steps we're interested in (which we don't). + # Importantly, it also changes the meaning of %j from "job name" to + # "step name" and (although this isn't mentioned explicitly in the + # docs) switches from "one line per job" mode to "one line per step" + # mode. Without it, we'd just get a list of one job, instead of a + # list of N steps. + my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`; if ($? != 0) { Log(undef, "warning: squeue exit status $? ($!)"); @@ -1310,15 +1370,9 @@ sub check_squeue # which of my jobsteps are running, according to squeue? my %ok; - foreach (@squeue) + for my $jobstepname (@squeue) { - if (/^(\d+)\.(\d+) (\S+)/) - { - if ($1 eq $ENV{SLURM_JOB_ID}) - { - $ok{$3} = 1; - } - } + $ok{$jobstepname} = 1; } # Check for child procs >60s old and not mentioned by squeue. @@ -1388,10 +1442,19 @@ sub preprocess_stderr # whoa. $main::please_freeze = 1; } - elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) { - $jobstep[$job]->{node_fail} = 1; + elsif ($line =~ /srun: error: Node failure on/) { + my $job_slot_index = $jobstep[$job]->{slotindex}; + $slot[$job_slot_index]->{node}->{fail_count}++; + $jobstep[$job]->{tempfail} = 1; + ban_node_by_slot($job_slot_index); + } + elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) { + $jobstep[$job]->{tempfail} = 1; ban_node_by_slot($jobstep[$job]->{slotindex}); } + elsif ($line =~ /arvados\.errors\.Keep/) { + $jobstep[$job]->{tempfail} = 1; + } } } @@ -1510,6 +1573,20 @@ print (arvados.api("v1").collections(). return $joboutput; } +# Calls create_output_collection, logs the result, and returns it. +# If that was successful, save that as the output in the job record. +sub save_output_collection { + my $collated_output = create_output_collection(); + + if (!$collated_output) { + Log(undef, "Failed to write output collection"); + } + else { + Log(undef, "job output $collated_output"); + $Job->update_attributes('output' => $collated_output); + } + return $collated_output; +} sub killem { @@ -1555,6 +1632,8 @@ sub fhbits # Send log output to Keep via arv-put. # # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. +# $log_pipe_out_buf is a string containing all output read from arv-put so far. +# $log_pipe_out_select is an IO::Select object around $log_pipe_out. # $log_pipe_pid is the pid of the arv-put subprocess. # # The only functions that should access these variables directly are: @@ -1563,6 +1642,13 @@ sub fhbits # Starts an arv-put pipe, reading data on stdin and writing it to # a $logfilename file in an output collection. # +# log_writer_read_output([$timeout]) +# Read output from $log_pipe_out and append it to $log_pipe_out_buf. +# Passes $timeout to the select() call, with a default of 0.01. +# Returns the result of the last read() call on $log_pipe_out, or +# -1 if read() wasn't called because select() timed out. +# Only other log_writer_* functions should need to call this. +# # log_writer_send($txt) # Writes $txt to the output log collection. # @@ -1573,25 +1659,40 @@ sub fhbits # Returns a true value if there is currently a live arv-put # process, false otherwise. # -my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); +my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select, + $log_pipe_pid); sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', - '--portable-data-hash', - '--project-uuid', $Job->{owner_uuid}, + '--stream', '--retries', '3', - '--name', $logfilename, '--filename', $logfilename, '-'); + $log_pipe_out_buf = ""; + $log_pipe_out_select = IO::Select->new($log_pipe_out); +} + +sub log_writer_read_output { + my $timeout = shift || 0.01; + my $read = -1; + while ($read && $log_pipe_out_select->can_read($timeout)) { + $read = read($log_pipe_out, $log_pipe_out_buf, 65536, + length($log_pipe_out_buf)); + } + if (!defined($read)) { + Log(undef, "error reading log manifest from arv-put: $!"); + } + return $read; } sub log_writer_send($) { my $txt = shift; print $log_pipe_in $txt; + log_writer_read_output(); } sub log_writer_finish() @@ -1599,22 +1700,28 @@ sub log_writer_finish() return unless $log_pipe_pid; close($log_pipe_in); - my $arv_put_output; - my $s = IO::Select->new($log_pipe_out); - if ($s->can_read(120)) { - sysread($log_pipe_out, $arv_put_output, 1024); - chomp($arv_put_output); - } else { + my $logger_failed = 0; + my $read_result = log_writer_read_output(120); + if ($read_result == -1) { + $logger_failed = -1; Log (undef, "timed out reading from 'arv-put'"); + } elsif ($read_result != 0) { + $logger_failed = -2; + Log(undef, "failed to read arv-put log manifest to EOF"); } waitpid($log_pipe_pid, 0); - $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; if ($?) { - Log("log_writer_finish: arv-put exited ".exit_status_s($?)) + $logger_failed ||= $?; + Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?)) } + close($log_pipe_out); + my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf; + $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf = + $log_pipe_out_select = undef; + return $arv_put_output; } @@ -1677,11 +1784,22 @@ sub save_meta my $justcheckpoint = shift; # false if this will be the last meta saved return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm return unless log_writer_is_active(); + my $log_manifest = log_writer_finish(); + return unless defined($log_manifest); + + if ($Job->{log}) { + my $prev_log_coll = api_call("collections/get", uuid => $Job->{log}); + $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest; + } - my $loglocator = log_writer_finish(); - Log (undef, "log manifest is $loglocator"); - $Job->{'log'} = $loglocator; - $Job->update_attributes('log', $loglocator); + my $log_coll = api_call( + "collections/create", ensure_unique_name => 1, collection => { + manifest_text => $log_manifest, + owner_uuid => $Job->{owner_uuid}, + name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}), + }); + Log(undef, "log collection is " . $log_coll->{portable_data_hash}); + $Job->update_attributes('log' => $log_coll->{portable_data_hash}); } @@ -1979,7 +2097,7 @@ sub set_nonblocking { } __DATA__ -#!/usr/bin/perl +#!/usr/bin/env perl # # This is crunch-job's internal dispatch script. crunch-job running on the API # server invokes this script on individual compute nodes, or localhost if we're @@ -2145,13 +2263,28 @@ if (-d $sdk_root) { } my $python_dir = "$install_dir/python"; -if ((-d $python_dir) and can_run("python2.7") and - (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) { - # egg_info failed, probably when it asked git for a build tag. - # Specify no build tag. - open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); - print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; - close($pysdk_cfg); +if ((-d $python_dir) and can_run("python2.7")) { + open(my $egg_info_pipe, "-|", + "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null"); + my @egg_info_errors = <$egg_info_pipe>; + close($egg_info_pipe); + + if ($?) { + if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) { + # egg_info apparently failed because it couldn't ask git for a build tag. + # Specify no build tag. + open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); + print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; + close($pysdk_cfg); + } else { + my $egg_info_exit = $? >> 8; + foreach my $errline (@egg_info_errors) { + warn $errline; + } + warn "python setup.py egg_info failed: exit $egg_info_exit"; + exit ($egg_info_exit || 1); + } + } } # Hide messages from the install script (unless it fails: shell_or_die