X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e999e4e19e5619b485f5c4a2d1e993e187d63975..308a6da1a9fd716f3957b116110a932c08aefafe:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 369bc3e1ae..081d745a5b 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -86,6 +86,7 @@ use POSIX ':sys_wait_h'; use POSIX qw(strftime); use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Arvados; +use Data::Dumper; use Digest::MD5 qw(md5_hex); use Getopt::Long; use IPC::Open2; @@ -156,20 +157,15 @@ my $dbh; my $sth; my @jobstep; -my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; }); +my $User = api_call("users/current"); if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification - $Job = retry_op(sub { - $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - }); + $Job = api_call("jobs/get", uuid => $jobspec); if (!$force_unlock) { # Claim this job, and make sure nobody else does - eval { retry_op(sub { - # lock() sets is_locked_by_uuid and changes state to Running. - $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'}) - }); }; + eval { api_call("jobs/lock", uuid => $Job->{uuid}); }; if ($@) { Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; @@ -190,7 +186,7 @@ else $Job->{'started_at'} = gmtime; $Job->{'state'} = 'Running'; - $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); }); + $Job = api_call("jobs/create", job => $Job); } $job_id = $Job->{'uuid'}; @@ -320,13 +316,11 @@ if (defined $Job->{thawedfromkey}) } else { - my $first_task = retry_op(sub { - $arv->{'job_tasks'}->{'create'}->execute('job_task' => { - 'job_uuid' => $Job->{'uuid'}, - 'sequence' => 0, - 'qsequence' => 0, - 'parameters' => {}, - }); + my $first_task = api_call("job_tasks/create", job_task => { + 'job_uuid' => $Job->{'uuid'}, + 'sequence' => 0, + 'qsequence' => 0, + 'parameters' => {}, }); push @jobstep, { 'level' => 0, 'failures' => 0, @@ -357,7 +351,7 @@ if (!defined $no_clear_tmp) { if ($cleanpid == 0) { srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']); + ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']); exit (1); } while (1) @@ -426,10 +420,8 @@ else { } else { # $repo is none of the above. It must be the name of a hosted # repository. - my $arv_repo_list = retry_op(sub { - $arv->{'repositories'}->{'list'}->execute( - 'filters' => [['name','=',$repo]]); - }); + my $arv_repo_list = api_call("repositories/list", + 'filters' => [['name','=',$repo]]); my @repos_found = @{$arv_repo_list->{'items'}}; my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'}; if ($n_found > 0) { @@ -547,8 +539,6 @@ else { my @execargs = ("sh", "-c", "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); - # Note: this section is almost certainly unnecessary if we're - # running tasks in docker containers. my $installpid = fork(); if ($installpid == 0) { @@ -694,7 +684,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; - $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$"; + $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname"; $ENV{"HOME"} = $ENV{"TASK_WORK"}; $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated @@ -723,36 +713,54 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec "; if ($docker_hash) { - $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 "; - $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid "; + my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid"; + $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; + $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy "; + # Dynamically configure the container to use the host system as its # DNS server. Get the host's global addresses from the ip command, # and turn them into docker --dns options using gawk. $command .= q{$(ip -o address show scope global | gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') }; - $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E "; + + # The source tree and $destdir directory (which we have + # installed on the worker host) are available in the container, + # under the same path. + $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E "; + $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E "; + + # Currently, we make arv-mount's mount point appear at /keep + # inside the container (instead of using the same path as the + # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However, + # crunch scripts and utilities must not rely on this. They must + # use $TASK_KEEPMOUNT. $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E "; - $command .= "--env=\QHOME=/home/crunch\E "; + $ENV{TASK_KEEPMOUNT} = "/keep"; + + # TASK_WORK is a plain docker data volume: it starts out empty, + # is writable, and persists until no containers use it any + # more. We don't use --volumes-from to share it with other + # containers: it is only accessible to this task, and it goes + # away when this task stops. + $command .= "--volume=\Q$ENV{TASK_WORK}\E "; + + # JOB_WORK is also a plain docker data volume for now. TODO: + # Share a single JOB_WORK volume across all task containers on a + # given worker node, and delete it when the job ends (and, in + # case that doesn't work, when the next job starts). + $command .= "--volume=\Q$ENV{JOB_WORK}\E "; + while (my ($env_key, $env_val) = each %ENV) { - if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) { - if ($env_key eq "TASK_WORK") { - $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E "; - } - elsif ($env_key eq "TASK_KEEPMOUNT") { - $command .= "--env=\QTASK_KEEPMOUNT=/keep\E "; - } - else { - $command .= "--env=\Q$env_key=$env_val\E "; - } + if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) { + $command .= "--env=\Q$env_key=$env_val\E "; } } - $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E "; - $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E "; + $command .= "--env=\QHOME=$ENV{HOME}\E "; $command .= "\Q$docker_hash\E "; $command .= "stdbuf --output=0 --error=0 "; - $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"}; + $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; @@ -763,8 +771,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) my @execargs = ('bash', '-c', $command); srun (\@srunargs, \@execargs, undef, $build_script_to_send); # exec() failed, we assume nothing happened. - Log(undef, "srun() failed on build script"); - die; + die "srun() failed on build script\n"; } close("writer"); if (!defined $childpid) @@ -919,10 +926,8 @@ else { while (my $manifest_line = <$orig_manifest>) { $orig_manifest_text .= $manifest_line; } - my $output = retry_op(sub { - $arv->{'collections'}->{'create'}->execute( - 'collection' => {'manifest_text' => $orig_manifest_text}); - }); + my $output = api_call("collections/create", collection => { + 'manifest_text' => $orig_manifest_text}); Log(undef, "output uuid " . $output->{uuid}); Log(undef, "output hash " . $output->{portable_data_hash}); $Job->update_attributes('output' => $output->{portable_data_hash}); @@ -1056,15 +1061,14 @@ sub reapchildren my $newtask_list = []; my $newtask_results; do { - $newtask_results = retry_op(sub { - $arv->{'job_tasks'}->{'list'}->execute( - 'where' => { - 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} - }, - 'order' => 'qsequence', - 'offset' => scalar(@$newtask_list), - ); - }); + $newtask_results = api_call( + "job_tasks/list", + 'where' => { + 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} + }, + 'order' => 'qsequence', + 'offset' => scalar(@$newtask_list), + ); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); foreach my $arvados_task (@$newtask_list) { @@ -1087,9 +1091,7 @@ sub check_refresh_wanted my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; - my $Job2 = retry_op(sub { - $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - }); + my $Job2 = api_call("jobs/get", uuid => $jobspec); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', 'cancelled_by_client_uuid', @@ -1555,11 +1557,13 @@ sub srun my $opts = shift || {}; my $stdin = shift; my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; - print STDERR (join (" ", - map { / / ? "'$_'" : $_ } - (@$args)), - "\n") - if $ENV{CRUNCH_DEBUG}; + + $Data::Dumper::Terse = 1; + $Data::Dumper::Indent = 0; + my $show_cmd = Dumper($args); + $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g; + $show_cmd =~ s/\n/ /g; + warn "starting: $show_cmd\n"; if (defined $stdin) { my $child = open STDIN, "-|"; @@ -1602,9 +1606,7 @@ sub find_docker_image { # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); - my $image = retry_op(sub { - $arv->{collections}->{get}->execute(uuid => $locator); - }); + my $image = api_call("collections/get", uuid => $locator); if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); @@ -1651,10 +1653,14 @@ sub retry_count { } sub retry_op { - # Given a function reference, call it with the remaining arguments. If - # it dies, retry it with exponential backoff until it succeeds, or until - # the current retry_count is exhausted. + # Pass in two function references. + # This method will be called with the remaining arguments. + # If it dies, retry it with exponential backoff until it succeeds, + # or until the current retry_count is exhausted. After each failure + # that can be retried, the second function will be called with + # the current try count (0-based), next try time, and error message. my $operation = shift; + my $retry_callback = shift; my $retries = retry_count(); foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); @@ -1662,6 +1668,7 @@ sub retry_op { if (!$@) { return $result; } elsif ($try_count < $retries) { + $retry_callback->($try_count, $next_try, $@); my $sleep_time = $next_try - time; sleep($sleep_time) if ($sleep_time > 0); } @@ -1672,6 +1679,32 @@ sub retry_op { die($@ . "\n"); } +sub api_call { + # Pass in a /-separated API method name, and arguments for it. + # This function will call that method, retrying as needed until + # the current retry_count is exhausted, with a log on the first failure. + my $method_name = shift; + my $log_api_retry = sub { + my ($try_count, $next_try_at, $errmsg) = @_; + $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; + $errmsg =~ s/\s/ /g; + $errmsg =~ s/\s+$//; + my $retry_msg; + if ($next_try_at < time) { + $retry_msg = "Retrying."; + } else { + my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at); + $retry_msg = "Retrying at $next_try_fmt."; + } + Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); + }; + my $method = $arv; + foreach my $key (split(/\//, $method_name)) { + $method = $method->{$key}; + } + return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); +} + sub exit_status_s { # Given a $?, return a human-readable exit code string like "0" or # "1" or "0 with signal 1" or "1 with signal 11". @@ -1692,7 +1725,7 @@ __DATA__ # checkout-and-build use Fcntl ':flock'; -use File::Path qw( make_path ); +use File::Path qw( make_path remove_tree ); my $destdir = $ENV{"CRUNCH_SRC"}; my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; @@ -1700,12 +1733,17 @@ my $repo = $ENV{"CRUNCH_SRC_URL"}; my $task_work = $ENV{"TASK_WORK"}; for my $dir ($destdir, $task_work) { - if ($dir) { - make_path $dir; - -e $dir or die "Failed to create temporary directory ($dir): $!"; - } + if ($dir) { + make_path $dir; + -e $dir or die "Failed to create temporary directory ($dir): $!"; + } +} + +if ($task_work) { + remove_tree($task_work, {keep_root => 1}); } + open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; if (readlink ("$destdir.commit") eq $commit && -d $destdir) { @@ -1718,6 +1756,7 @@ if (readlink ("$destdir.commit") eq $commit && -d $destdir) { } unlink "$destdir.commit"; +open STDERR_ORIG, ">&STDERR"; open STDOUT, ">", "$destdir.log"; open STDERR, ">&STDOUT"; @@ -1772,8 +1811,13 @@ sub shell_or_die if ($ENV{"DEBUG"}) { print STDERR "@_\n"; } - system (@_) == 0 - or die "@_ failed: $! exit 0x".sprintf("%x",$?); + if (system (@_) != 0) { + my $err = $!; + my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f); + open STDERR, ">&STDERR_ORIG"; + system ("cat $destdir.log >&2"); + die "@_ failed ($err): $exitstatus"; + } } __DATA__