my $cleanpid = fork();
if ($cleanpid == 0)
{
- # Find FUSE mounts that look like Keep mounts (the mount path has the
- # word "keep") and unmount them. Then clean up work directories.
- # TODO: When #5036 is done and widely deployed, we can get rid of the
- # regular expression and just unmount everything with type fuse.keep.
+ # Find FUSE mounts under $CRUNCH_TMP and unmount them.
+ # Then clean up work directories.
+ # TODO: When #5036 is done and widely deployed, we can limit mount's
+ # -t option to simply fuse.keep.
srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
+ ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
exit (1);
}
while (1)
}
# If this job requires a Docker image, install that.
-my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
if ($docker_locator = $Job->{docker_image_locator}) {
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
if (!$docker_hash)
{fork => 1});
$docker_limitmem = ($? == 0);
+ # Find a non-root Docker user to use.
+ # Tries the default user for the container, then 'crunch', then 'nobody',
+ # testing for whether the actual user id is non-zero. This defends against
+ # mistakes but not malice, but we intend to harden the security in the future
+ # so we don't want anyone getting used to their jobs running as root in their
+ # Docker containers.
+ my @tryusers = ("", "crunch", "nobody");
+ foreach my $try_user (@tryusers) {
+ my $try_user_arg;
+ if ($try_user eq "") {
+ Log(undef, "Checking if container default user is not UID 0");
+ $try_user_arg = "";
+ } else {
+ Log(undef, "Checking if user '$try_user' is not UID 0");
+ $try_user_arg = "--user=$try_user";
+ }
+ srun(["srun", "--nodelist=" . $node[0]],
+ ["/bin/sh", "-ec",
+ "a=`$docker_bin run $try_user_arg $docker_hash id --user` && " .
+ " test \$a -ne 0"],
+ {fork => 1});
+ if ($? == 0) {
+ $dockeruserarg = $try_user_arg;
+ if ($try_user eq "") {
+ Log(undef, "Container will run with default user");
+ } else {
+ Log(undef, "Container will run with $dockeruserarg");
+ }
+ last;
+ }
+ }
+
+ if (!defined $dockeruserarg) {
+ croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
+ }
+
if ($Job->{arvados_sdk_version}) {
# The job also specifies an Arvados SDK version. Add the SDKs to the
# tar file for the build script to install.
THISROUND:
for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
+ # Don't create new tasks if we already know the job's final result.
+ last if defined($main::success);
+
my $id = $jobstep_todo[$todo_ptr];
my $Jobstep = $jobstep[$id];
if ($Jobstep->{level} != $level)
qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
"--job-name=$job_id.$id.$$",
);
+
+ my $stdbuf = " stdbuf --output=0 --error=0 ";
+
+ my $arv_file_cache = "";
+ if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) {
+ $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024);
+ }
+
my $command =
"if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
.q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP ";
- $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
+ $command .= "&& exec arv-mount --by-pdh --crunchstat-interval=10 --allow-other $arv_file_cache $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_hash)
{
- my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
+ my $containername = "$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}";
+ my $cidfile = "$ENV{CRUNCH_TMP}/$containername.cid";
$command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
- $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+ $command .= "$docker_bin run --name=$containername --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
# We only set memory limits if Docker lets us limit both memory and swap.
# Memory limits alone have been supported longer, but subprocesses tend
# to get SIGKILL if they exceed that without any swap limit set.
}
$command .= "--env=\QHOME=$ENV{HOME}\E ";
$command .= "\Q$docker_hash\E ";
- $command .= "stdbuf --output=0 --error=0 ";
- $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+
+ if ($Job->{arvados_sdk_version}) {
+ $command .= $stdbuf;
+ $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
+ } else {
+ $command .= "/bin/sh -c \'python -c " .
+ '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' .
+ ">&2 2>/dev/null; " .
+ "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+ "if which stdbuf >/dev/null ; then " .
+ " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " else " .
+ " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " fi\'";
+ }
} else {
# Non-docker run
$command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
- $command .= "stdbuf --output=0 --error=0 ";
+ $command .= $stdbuf;
$command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
}
||
($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo))
{
- last THISROUND if $main::please_freeze || defined($main::success);
+ last THISROUND if $main::please_freeze;
if ($main::please_info)
{
$main::please_info = 0;
my $gotsome
= readfrompipes ()
+ reapchildren ();
- if (!$gotsome)
+ if (!$gotsome || ($latest_refresh + 2 < scalar time))
{
check_refresh_wanted();
check_squeue();
if (!defined $task_success) {
# task did not indicate one way or the other --> fail
+ Log($jobstepid, sprintf(
+ "ERROR: Task process exited %d, but never updated its task record to indicate success and record its output.",
+ exit_status_s($childstatus)));
$Jobstep->{'arvados_task'}->{success} = 0;
$Jobstep->{'arvados_task'}->save;
$task_success = 0;
close($log_pipe_in);
+ my $logger_failed = 0;
my $read_result = log_writer_read_output(120);
if ($read_result == -1) {
+ $logger_failed = -1;
Log (undef, "timed out reading from 'arv-put'");
} elsif ($read_result != 0) {
+ $logger_failed = -2;
Log(undef, "failed to read arv-put log manifest to EOF");
}
waitpid($log_pipe_pid, 0);
if ($?) {
+ $logger_failed ||= $?;
Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
}
close($log_pipe_out);
- my $arv_put_output = $log_pipe_out_buf;
+ my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf;
$log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
$log_pipe_out_select = undef;
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
return unless log_writer_is_active();
+ my $log_manifest = log_writer_finish();
+ return unless defined($log_manifest);
- my $log_manifest = "";
if ($Job->{log}) {
my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
- $log_manifest .= $prev_log_coll->{manifest_text};
+ $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest;
}
- $log_manifest .= log_writer_finish();
my $log_coll = api_call(
"collections/create", ensure_unique_name => 1, collection => {
$Log->("Built Python SDK virtualenv");
}
- my $pip_bin = "pip";
+ my @pysdk_version_cmd = ("python", "-c",
+ "from pkg_resources import get_distribution as get; print get('arvados-python-client').version");
if ($venv_built) {
$Log->("Running in Python SDK virtualenv");
- $pip_bin = "$venv_dir/bin/pip";
+ @pysdk_version_cmd = ();
my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
@ARGV = ("/bin/sh", "-ec",
". \Q$venv_dir/bin/activate\E; exec $orig_argv");
"\$PATH. Can't install Python SDK.");
}
- my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
- if ($pkgs) {
- $Log->("Using Arvados SDK:");
- foreach my $line (split /\n/, $pkgs) {
- $Log->($line);
+ if (@pysdk_version_cmd) {
+ open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd);
+ my $pysdk_version = <$pysdk_version_pipe>;
+ close($pysdk_version_pipe);
+ if ($? == 0) {
+ chomp($pysdk_version);
+ $Log->("Using Arvados SDK version $pysdk_version");
+ } else {
+ # A lot could've gone wrong here, but pretty much all of it means that
+ # Python won't be able to load the Arvados SDK.
+ $Log->("Warning: Arvados SDK not found");
}
- } else {
- $Log->("Arvados SDK packages not found");
}
while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
my $python_dir = "$install_dir/python";
if ((-d $python_dir) and can_run("python2.7")) {
open(my $egg_info_pipe, "-|",
- "python2.7 \Q$python_dir/setup.py\E --quiet egg_info 2>&1 >/dev/null");
+ "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null");
my @egg_info_errors = <$egg_info_pipe>;
close($egg_info_pipe);
+
if ($?) {
- if (@egg_info_errors and ($egg_info_errors[-1] =~ /\bgit\b/)) {
+ if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) {
# egg_info apparently failed because it couldn't ask git for a build tag.
# Specify no build tag.
open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
} else {
my $egg_info_exit = $? >> 8;
foreach my $errline (@egg_info_errors) {
- print STDERR_ORIG $errline;
+ warn $errline;
}
warn "python setup.py egg_info failed: exit $egg_info_exit";
exit ($egg_info_exit || 1);