X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ad7679cfe57733940f8461097ee01bfd97997ce6..a79deca1f43e0c8854820ec77a1663d8bbf9a034:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index b38efdc53e..b2c14d5b71 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -1,4 +1,4 @@ -#!/usr/bin/perl +#!/usr/bin/env perl # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- =head1 NAME @@ -126,7 +126,7 @@ my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; -my $docker_bin = "/usr/bin/docker.io"; +my $docker_bin = "docker.io"; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, @@ -169,8 +169,7 @@ if ($jobspec =~ /^[-a-z\d]+$/) } else { - $Job = JSON::decode_json($jobspec); - $local_job = 1; + $local_job = JSON::decode_json($jobspec); } @@ -178,7 +177,7 @@ else # at least able to run basic commands: they aren't down or severely # misconfigured. my $cmd = ['true']; -if ($Job->{docker_image_locator}) { +if (($Job || $local_job)->{docker_image_locator}) { $cmd = [$docker_bin, 'ps', '-q']; } Log(undef, "Sanity check is `@$cmd`"); @@ -208,15 +207,15 @@ else { if (!$resume_stash) { - map { croak ("No $_ specified") unless $Job->{$_} } + map { croak ("No $_ specified") unless $local_job->{$_} } qw(script script_version script_parameters); } - $Job->{'is_locked_by_uuid'} = $User->{'uuid'}; - $Job->{'started_at'} = gmtime; - $Job->{'state'} = 'Running'; + $local_job->{'is_locked_by_uuid'} = $User->{'uuid'}; + $local_job->{'started_at'} = gmtime; + $local_job->{'state'} = 'Running'; - $Job = api_call("jobs/create", job => $Job); + $Job = api_call("jobs/create", job => $local_job); } $job_id = $Job->{'uuid'}; @@ -391,12 +390,12 @@ if (!defined $no_clear_tmp) { my $cleanpid = fork(); if ($cleanpid == 0) { - # Find FUSE mounts that look like Keep mounts (the mount path has the - # word "keep") and unmount them. Then clean up work directories. - # TODO: When #5036 is done and widely deployed, we can get rid of the - # regular expression and just unmount everything with type fuse.keep. + # Find FUSE mounts under $CRUNCH_TMP and unmount them. + # Then clean up work directories. + # TODO: When #5036 is done and widely deployed, we can limit mount's + # -t option to simply fuse.keep. srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']); + ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']); exit (1); } while (1) @@ -405,11 +404,14 @@ if (!defined $no_clear_tmp) { freeze_if_want_freeze ($cleanpid); select (undef, undef, undef, 0.1); } - Log (undef, "Cleanup command exited ".exit_status_s($?)); + if ($?) { + Log(undef, "Clean work dirs: exit ".exit_status_s($?)); + exit(EX_RETRY_UNLOCKED); + } } # If this job requires a Docker image, install that. -my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem); +my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg); if ($docker_locator = $Job->{docker_image_locator}) { ($docker_stream, $docker_hash) = find_docker_image($docker_locator); if (!$docker_hash) @@ -447,6 +449,42 @@ fi {fork => 1}); $docker_limitmem = ($? == 0); + # Find a non-root Docker user to use. + # Tries the default user for the container, then 'crunch', then 'nobody', + # testing for whether the actual user id is non-zero. This defends against + # mistakes but not malice, but we intend to harden the security in the future + # so we don't want anyone getting used to their jobs running as root in their + # Docker containers. + my @tryusers = ("", "crunch", "nobody"); + foreach my $try_user (@tryusers) { + my $try_user_arg; + if ($try_user eq "") { + Log(undef, "Checking if container default user is not UID 0"); + $try_user_arg = ""; + } else { + Log(undef, "Checking if user '$try_user' is not UID 0"); + $try_user_arg = "--user=$try_user"; + } + srun(["srun", "--nodelist=" . $node[0]], + ["/bin/sh", "-ec", + "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " . + " test \$a -ne 0"], + {fork => 1}); + if ($? == 0) { + $dockeruserarg = $try_user_arg; + if ($try_user eq "") { + Log(undef, "Container will run with default user"); + } else { + Log(undef, "Container will run with $dockeruserarg"); + } + last; + } + } + + if (!defined $dockeruserarg) { + croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container."); + } + if ($Job->{arvados_sdk_version}) { # The job also specifies an Arvados SDK version. Add the SDKs to the # tar file for the build script to install. @@ -596,7 +634,7 @@ else { unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) { croak("`$gitcmd rev-list` exited " .exit_status_s($?) - .", '$treeish' not found. Giving up."); + .", '$treeish' not found, giving up"); } $commit = $1; Log(undef, "Version $treeish is commit $commit"); @@ -782,6 +820,9 @@ update_progress_stats(); THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { + # Don't create new tasks if we already know the job's final result. + last if defined($main::success); + my $id = $jobstep_todo[$todo_ptr]; my $Jobstep = $jobstep[$id]; if ($Jobstep->{level} != $level) @@ -842,6 +883,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); + + my $stdbuf = " stdbuf --output=0 --error=0 "; + + my $arv_file_cache = ""; + if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) { + $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024); + } + my $command = "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " @@ -852,12 +901,13 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) .q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' {"script"}; + + if ($Job->{arvados_sdk_version}) { + $command .= $stdbuf; + $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E"; + } else { + $command .= "/bin/sh -c \'python -c " . + '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' . + ">&2 2>/dev/null; " . + "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " . + "if which stdbuf >/dev/null ; then " . + " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" . + " else " . + " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" . + " fi\'"; + } } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; - $command .= "stdbuf --output=0 --error=0 "; + $command .= $stdbuf; $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } @@ -973,7 +1029,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) || ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { - last THISROUND if $main::please_freeze || defined($main::success); + last THISROUND if $main::please_freeze; if ($main::please_info) { $main::please_info = 0; @@ -985,14 +1041,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) my $gotsome = readfrompipes () + reapchildren (); - if (!$gotsome) + if (!$gotsome || ($latest_refresh + 2 < scalar time)) { check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); } - elsif (time - $progress_stats_updated >= 30) + elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty) { update_progress_stats(); } @@ -1099,8 +1155,8 @@ sub update_progress_stats $progress_stats_updated = time; return if !$progress_is_dirty; my ($todo, $done, $running) = (scalar @jobstep_todo, - scalar @jobstep_done, - scalar @slot - scalar @freeslot - scalar @holdslot); + scalar @jobstep_done, + scalar keys(%proc)); $Job->{'tasks_summary'} ||= {}; $Job->{'tasks_summary'}->{'todo'} = $todo; $Job->{'tasks_summary'}->{'done'} = $done; @@ -1134,6 +1190,9 @@ sub reapchildren if (!defined $task_success) { # task did not indicate one way or the other --> fail + Log($jobstepid, sprintf( + "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.", + exit_status_s($childstatus))); $Jobstep->{'arvados_task'}->{success} = 0; $Jobstep->{'arvados_task'}->save; $task_success = 0; @@ -1650,20 +1709,24 @@ sub log_writer_finish() close($log_pipe_in); + my $logger_failed = 0; my $read_result = log_writer_read_output(120); if ($read_result == -1) { + $logger_failed = -1; Log (undef, "timed out reading from 'arv-put'"); } elsif ($read_result != 0) { + $logger_failed = -2; Log(undef, "failed to read arv-put log manifest to EOF"); } waitpid($log_pipe_pid, 0); if ($?) { + $logger_failed ||= $?; Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?)) } close($log_pipe_out); - my $arv_put_output = $log_pipe_out_buf; + my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf; $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf = $log_pipe_out_select = undef; @@ -1729,13 +1792,13 @@ sub save_meta my $justcheckpoint = shift; # false if this will be the last meta saved return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm return unless log_writer_is_active(); + my $log_manifest = log_writer_finish(); + return unless defined($log_manifest); - my $log_manifest = ""; if ($Job->{log}) { my $prev_log_coll = api_call("collections/get", uuid => $Job->{log}); - $log_manifest .= $prev_log_coll->{manifest_text}; + $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest; } - $log_manifest .= log_writer_finish(); my $log_coll = api_call( "collections/create", ensure_unique_name => 1, collection => { @@ -2042,7 +2105,7 @@ sub set_nonblocking { } __DATA__ -#!/usr/bin/perl +#!/usr/bin/env perl # # This is crunch-job's internal dispatch script. crunch-job running on the API # server invokes this script on individual compute nodes, or localhost if we're @@ -2114,10 +2177,11 @@ if (@ARGV) { $Log->("Built Python SDK virtualenv"); } - my $pip_bin = "pip"; + my @pysdk_version_cmd = ("python", "-c", + "from pkg_resources import get_distribution as get; print get('arvados-python-client').version"); if ($venv_built) { $Log->("Running in Python SDK virtualenv"); - $pip_bin = "$venv_dir/bin/pip"; + @pysdk_version_cmd = (); my $orig_argv = join(" ", map { quotemeta($_); } @ARGV); @ARGV = ("/bin/sh", "-ec", ". \Q$venv_dir/bin/activate\E; exec $orig_argv"); @@ -2126,14 +2190,18 @@ if (@ARGV) { "\$PATH. Can't install Python SDK."); } - my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`; - if ($pkgs) { - $Log->("Using Arvados SDK:"); - foreach my $line (split /\n/, $pkgs) { - $Log->($line); + if (@pysdk_version_cmd) { + open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd); + my $pysdk_version = <$pysdk_version_pipe>; + close($pysdk_version_pipe); + if ($? == 0) { + chomp($pysdk_version); + $Log->("Using Arvados SDK version $pysdk_version"); + } else { + # A lot could've gone wrong here, but pretty much all of it means that + # Python won't be able to load the Arvados SDK. + $Log->("Warning: Arvados SDK not found"); } - } else { - $Log->("Arvados SDK packages not found"); } while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) { @@ -2210,11 +2278,12 @@ if (-d $sdk_root) { my $python_dir = "$install_dir/python"; if ((-d $python_dir) and can_run("python2.7")) { open(my $egg_info_pipe, "-|", - "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null"); + "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null"); my @egg_info_errors = <$egg_info_pipe>; close($egg_info_pipe); + if ($?) { - if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) { + if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) { # egg_info apparently failed because it couldn't ask git for a build tag. # Specify no build tag. open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); @@ -2223,7 +2292,7 @@ if ((-d $python_dir) and can_run("python2.7")) { } else { my $egg_info_exit = $? >> 8; foreach my $errline (@egg_info_errors) { - print STDERR_ORIG $errline; + warn $errline; } warn "python setup.py egg_info failed: exit $egg_info_exit"; exit ($egg_info_exit || 1);