X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ad48bb33bf49e3fec668a0ccf788ad9b2ffcaa80..82225f2eeb39f6798ff83e979c28698ff617d414:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index b63886e105..86e018cc99 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -126,6 +126,7 @@ my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; +my $cgroup_root = "/sys/fs/cgroup"; my $docker_bin = "docker.io"; my $docker_run_args = ""; GetOptions('force-unlock' => \$force_unlock, @@ -134,6 +135,7 @@ GetOptions('force-unlock' => \$force_unlock, 'job-api-token=s' => \$job_api_token, 'no-clear-tmp' => \$no_clear_tmp, 'resume-stash=s' => \$resume_stash, + 'cgroup-root=s' => \$cgroup_root, 'docker-bin=s' => \$docker_bin, 'docker-run-args=s' => \$docker_run_args, ); @@ -430,7 +432,7 @@ fi # Determine whether this version of Docker supports memory+swap limits. ($exited, $stdout, $stderr) = srun_sync( - ["srun", "--nodelist=" . $node[0]], + ["srun", "--nodes=1"], [$docker_bin, 'run', '--help'], {label => "check --memory-swap feature"}); $docker_limitmem = ($stdout =~ /--memory-swap/); @@ -453,7 +455,7 @@ fi $try_user_arg = "--user=$try_user"; } my ($exited, $stdout, $stderr) = srun_sync( - ["srun", "--nodelist=" . $node[0]], + ["srun", "--nodes=1"], ["/bin/sh", "-ec", "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"], {label => $label}); @@ -747,6 +749,7 @@ if ($initial_tasks_this_level < @node) { @freeslot = (0..$#slot); } my $round_num_freeslots = scalar(@freeslot); +print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n"; my %round_max_slots = (); for (my $ii = $#freeslot; $ii >= 0; $ii--) { @@ -849,7 +852,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' /dev/null ; then VOLUME_CRUNCHRUNNER=\"--volume=\$(which crunchrunner):/usr/local/bin/crunchrunner\" ; fi " + ."&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUME_CERTS=\"--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt\" ; fi " + ."&& if test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUME_CERTS=\"--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt\" ; fi "; $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec "; $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh"; @@ -859,7 +865,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}"; my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid"; - $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; + $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; $command .= "$docker_bin run $docker_run_args --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy "; # We only set memory limits if Docker lets us limit both memory and swap. # Memory limits alone have been supported longer, but subprocesses tend @@ -914,6 +920,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) # For now, use the same approach as TASK_WORK above. $ENV{"JOB_WORK"} = "/tmp/crunch-job-work"; + # Bind mount the crunchrunner binary and host TLS certificates file into + # the container. + $command .= "\"\$VOLUME_CRUNCHRUNNER\" \"\$VOLUME_CERTS\" "; + while (my ($env_key, $env_val) = each %ENV) { if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) { @@ -939,7 +949,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } } else { # Non-docker run - $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; + $command .= "crunchstat -cgroup-root=\Q$cgroup_root\E -poll=10000 "; $command .= $stdbuf; $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } @@ -1130,109 +1140,119 @@ sub update_progress_stats sub reapchildren { - my $pid = waitpid (-1, WNOHANG); - return 0 if $pid <= 0; - - my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} - . "." - . $slot[$proc{$pid}->{slot}]->{cpu}); - my $jobstepidx = $proc{$pid}->{jobstepidx}; - my $elapsed = time - $proc{$pid}->{time}; - my $Jobstep = $jobstep[$jobstepidx]; - - my $childstatus = $?; - my $exitvalue = $childstatus >> 8; - my $exitinfo = "exit ".exit_status_s($childstatus); - $Jobstep->{'arvados_task'}->reload; - my $task_success = $Jobstep->{'arvados_task'}->{success}; - - Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success"); - - if (!defined $task_success) { - # task did not indicate one way or the other --> fail - Log($jobstepidx, sprintf( - "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", - exit_status_s($childstatus))); - $Jobstep->{'arvados_task'}->{success} = 0; - $Jobstep->{'arvados_task'}->save; - $task_success = 0; - } + my $children_reaped = 0; + my @successful_task_uuids = (); - if (!$task_success) + while((my $pid = waitpid (-1, WNOHANG)) > 0) { - my $temporary_fail; - $temporary_fail ||= $Jobstep->{tempfail}; - $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); - - ++$thisround_failed; - ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; - - # Check for signs of a failed or misconfigured node - if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= - 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { - # Don't count this against jobstep failure thresholds if this - # node is already suspected faulty and srun exited quickly - if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && - $elapsed < 5) { - Log ($jobstepidx, "blaming failure on suspect node " . - $slot[$proc{$pid}->{slot}]->{node}->{name}); - $temporary_fail ||= 1; - } - ban_node_by_slot($proc{$pid}->{slot}); + my $childstatus = $?; + + my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} + . "." + . $slot[$proc{$pid}->{slot}]->{cpu}); + my $jobstepidx = $proc{$pid}->{jobstepidx}; + + $children_reaped++; + my $elapsed = time - $proc{$pid}->{time}; + my $Jobstep = $jobstep[$jobstepidx]; + + my $exitvalue = $childstatus >> 8; + my $exitinfo = "exit ".exit_status_s($childstatus); + $Jobstep->{'arvados_task'}->reload; + my $task_success = $Jobstep->{'arvados_task'}->{success}; + + Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success"); + + if (!defined $task_success) { + # task did not indicate one way or the other --> fail + Log($jobstepidx, sprintf( + "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", + exit_status_s($childstatus))); + $Jobstep->{'arvados_task'}->{success} = 0; + $Jobstep->{'arvados_task'}->save; + $task_success = 0; } - Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds', - ++$Jobstep->{'failures'}, - $temporary_fail ? 'temporary' : 'permanent', - $elapsed)); + if (!$task_success) + { + my $temporary_fail; + $temporary_fail ||= $Jobstep->{tempfail}; + $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); + + ++$thisround_failed; + ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; + + # Check for signs of a failed or misconfigured node + if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= + 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { + # Don't count this against jobstep failure thresholds if this + # node is already suspected faulty and srun exited quickly + if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && + $elapsed < 5) { + Log ($jobstepidx, "blaming failure on suspect node " . + $slot[$proc{$pid}->{slot}]->{node}->{name}); + $temporary_fail ||= 1; + } + ban_node_by_slot($proc{$pid}->{slot}); + } - if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { - # Give up on this task, and the whole job - $main::success = 0; + Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds', + ++$Jobstep->{'failures'}, + $temporary_fail ? 'temporary' : 'permanent', + $elapsed)); + + if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { + # Give up on this task, and the whole job + $main::success = 0; + } + # Put this task back on the todo queue + push @jobstep_todo, $jobstepidx; + $Job->{'tasks_summary'}->{'failed'}++; } - # Put this task back on the todo queue - push @jobstep_todo, $jobstepidx; - $Job->{'tasks_summary'}->{'failed'}++; - } - else - { - ++$thisround_succeeded; - $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; - $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; - $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; - push @jobstep_done, $jobstepidx; - Log ($jobstepidx, "success in $elapsed seconds"); - } - $Jobstep->{exitcode} = $childstatus; - $Jobstep->{finishtime} = time; - $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); - $Jobstep->{'arvados_task'}->save; - process_stderr_final ($jobstepidx); - Log ($jobstepidx, sprintf("task output (%d bytes): %s", - length($Jobstep->{'arvados_task'}->{output}), - $Jobstep->{'arvados_task'}->{output})); + else # task_success + { + push @successful_task_uuids, $Jobstep->{'arvados_task'}->{uuid}; + ++$thisround_succeeded; + $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; + $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; + push @jobstep_done, $jobstepidx; + Log ($jobstepidx, "success in $elapsed seconds"); + } + $Jobstep->{exitcode} = $childstatus; + $Jobstep->{finishtime} = time; + $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); + $Jobstep->{'arvados_task'}->save; + process_stderr_final ($jobstepidx); + Log ($jobstepidx, sprintf("task output (%d bytes): %s", + length($Jobstep->{'arvados_task'}->{output}), + $Jobstep->{'arvados_task'}->{output})); - close $reader{$jobstepidx}; - delete $reader{$jobstepidx}; - delete $slot[$proc{$pid}->{slot}]->{pid}; - push @freeslot, $proc{$pid}->{slot}; - delete $proc{$pid}; + close $reader{$jobstepidx}; + delete $reader{$jobstepidx}; + delete $slot[$proc{$pid}->{slot}]->{pid}; + push @freeslot, $proc{$pid}->{slot}; + delete $proc{$pid}; + + $progress_is_dirty = 1; + } - if ($task_success) { + if (scalar(@successful_task_uuids) > 0) + { + Log (undef, sprintf("%d tasks exited (%d succeeded), checking for new tasks from API server.", $children_reaped, scalar(@successful_task_uuids))); # Load new tasks my $newtask_list = []; my $newtask_results; do { $newtask_results = api_call( "job_tasks/list", - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, + 'filters' => [["created_by_job_task_uuid","in",\@successful_task_uuids]], 'order' => 'qsequence', 'offset' => scalar(@$newtask_list), - ); + ); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); + Log (undef, sprintf("Got %d new tasks from API server.", scalar(@$newtask_list))); foreach my $arvados_task (@$newtask_list) { my $jobstep = { 'level' => $arvados_task->{'sequence'}, @@ -1244,8 +1264,7 @@ sub reapchildren } } - $progress_is_dirty = 1; - 1; + return $children_reaped; } sub check_refresh_wanted @@ -1292,10 +1311,13 @@ sub check_squeue # squeue check interval (15s) this should make the squeue check an # infrequent event. my $silent_procs = 0; - for my $procinfo (values %proc) + for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc) { - my $jobstep = $jobstep[$procinfo->{jobstepidx}]; - if ($jobstep->{stderr_at} < $last_squeue_check) + if (!exists($js->{stderr_at})) + { + $js->{stderr_at} = 0; + } + if ($js->{stderr_at} < $last_squeue_check) { $silent_procs++; } @@ -1305,14 +1327,14 @@ sub check_squeue # use killem() on procs whose killtime is reached while (my ($pid, $procinfo) = each %proc) { - my $jobstep = $jobstep[$procinfo->{jobstepidx}]; + my $js = $jobstep[$procinfo->{jobstepidx}]; if (exists $procinfo->{killtime} && $procinfo->{killtime} <= time - && $jobstep->{stderr_at} < $last_squeue_check) + && $js->{stderr_at} < $last_squeue_check) { my $sincewhen = ""; - if ($jobstep->{stderr_at}) { - $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s"; + if ($js->{stderr_at}) { + $sincewhen = " in last " . (time - $js->{stderr_at}) . "s"; } Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); killem ($pid); @@ -1383,27 +1405,37 @@ sub release_allocation sub readfrompipes { my $gotsome = 0; + my %fd_job; + my $sel = IO::Select->new(); foreach my $jobstepidx (keys %reader) + { + my $fd = $reader{$jobstepidx}; + $sel->add($fd); + $fd_job{$fd} = $jobstepidx; + + if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) { + $sel->add($stdout_fd); + $fd_job{$stdout_fd} = $jobstepidx; + } + } + # select on all reader fds with 0.1s timeout + my @ready_fds = $sel->can_read(0.1); + foreach my $fd (@ready_fds) { my $buf; - if ($jobstep[$jobstepidx]->{stdout_r} && - 0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536)) + if (0 < sysread ($fd, $buf, 65536)) { + $gotsome = 1; print STDERR $buf if $ENV{CRUNCH_DEBUG}; - if (exists $jobstep[$jobstepidx]->{stdout_captured}) { + + my $jobstepidx = $fd_job{$fd}; + if ($jobstep[$jobstepidx]->{stdout_r} == $fd) { $jobstep[$jobstepidx]->{stdout_captured} .= $buf; + next; } - $gotsome = 1; - } - if (0 < sysread ($reader{$jobstepidx}, $buf, 65536)) - { - print STDERR $buf if $ENV{CRUNCH_DEBUG}; + $jobstep[$jobstepidx]->{stderr_at} = time; $jobstep[$jobstepidx]->{stderr} .= $buf; - if (exists $jobstep[$jobstepidx]->{stderr_captured}) { - $jobstep[$jobstepidx]->{stderr_captured} .= $buf; - } - $gotsome = 1; # Consume everything up to the last \n preprocess_stderr ($jobstepidx); @@ -1441,7 +1473,6 @@ sub preprocess_stderr # attached to a particular worker slot. } elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) { - my $job_slot_index = $jobstep[$jobstepidx]->{slotindex}; my $job_slot_index = $jobstep[$jobstepidx]->{slotindex}; $slot[$job_slot_index]->{node}->{fail_count}++; $jobstep[$jobstepidx]->{tempfail} = 1; @@ -1451,7 +1482,7 @@ sub preprocess_stderr $jobstep[$jobstepidx]->{tempfail} = 1; ban_node_by_slot($jobstep[$jobstepidx]->{slotindex}); } - elsif ($line =~ /arvados\.errors\.Keep/) { + elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) { $jobstep[$jobstepidx]->{tempfail} = 1; } }