use POSIX qw(strftime);
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Arvados;
+use Data::Dumper;
use Digest::MD5 qw(md5_hex);
use Getopt::Long;
use IPC::Open2;
my $sth;
my @jobstep;
-my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; });
+my $User = api_call("users/current");
if ($jobspec =~ /^[-a-z\d]+$/)
{
# $jobspec is an Arvados UUID, not a JSON job specification
- $Job = retry_op(sub {
- $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
- });
+ $Job = api_call("jobs/get", uuid => $jobspec);
if (!$force_unlock) {
# Claim this job, and make sure nobody else does
- eval { retry_op(sub {
- # lock() sets is_locked_by_uuid and changes state to Running.
- $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
- }); };
+ eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
if ($@) {
Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
exit EX_TEMPFAIL;
$Job->{'started_at'} = gmtime;
$Job->{'state'} = 'Running';
- $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); });
+ $Job = api_call("jobs/create", job => $Job);
}
$job_id = $Job->{'uuid'};
}
else
{
- my $first_task = retry_op(sub {
- $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
- 'job_uuid' => $Job->{'uuid'},
- 'sequence' => 0,
- 'qsequence' => 0,
- 'parameters' => {},
- });
+ my $first_task = api_call("job_tasks/create", job_task => {
+ 'job_uuid' => $Job->{'uuid'},
+ 'sequence' => 0,
+ 'qsequence' => 0,
+ 'parameters' => {},
});
push @jobstep, { 'level' => 0,
'failures' => 0,
if ($cleanpid == 0)
{
srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+ ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
exit (1);
}
while (1)
} else {
# $repo is none of the above. It must be the name of a hosted
# repository.
- my $arv_repo_list = retry_op(sub {
- $arv->{'repositories'}->{'list'}->execute(
- 'filters' => [['name','=',$repo]]);
- });
+ my $arv_repo_list = api_call("repositories/list",
+ 'filters' => [['name','=',$repo]]);
my @repos_found = @{$arv_repo_list->{'items'}};
my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
if ($n_found > 0) {
my @execargs = ("sh", "-c",
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
- # Note: this section is almost certainly unnecessary if we're
- # running tasks in docker containers.
my $installpid = fork();
if ($installpid == 0)
{
}
$ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
$ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
- $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
+ $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
$ENV{"HOME"} = $ENV{"TASK_WORK"};
$ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
$command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_hash)
{
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
- $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
+ my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
+ $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+ $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+
# Dynamically configure the container to use the host system as its
# DNS server. Get the host's global addresses from the ip command,
# and turn them into docker --dns options using gawk.
$command .=
q{$(ip -o address show scope global |
gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
- $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
+
+ # The source tree and $destdir directory (which we have
+ # installed on the worker host) are available in the container,
+ # under the same path.
+ $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
+ $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
+
+ # Currently, we make arv-mount's mount point appear at /keep
+ # inside the container (instead of using the same path as the
+ # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
+ # crunch scripts and utilities must not rely on this. They must
+ # use $TASK_KEEPMOUNT.
$command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
- $command .= "--env=\QHOME=/home/crunch\E ";
+ $ENV{TASK_KEEPMOUNT} = "/keep";
+
+ # TASK_WORK is a plain docker data volume: it starts out empty,
+ # is writable, and persists until no containers use it any
+ # more. We don't use --volumes-from to share it with other
+ # containers: it is only accessible to this task, and it goes
+ # away when this task stops.
+ $command .= "--volume=\Q$ENV{TASK_WORK}\E ";
+
+ # JOB_WORK is also a plain docker data volume for now. TODO:
+ # Share a single JOB_WORK volume across all task containers on a
+ # given worker node, and delete it when the job ends (and, in
+ # case that doesn't work, when the next job starts).
+ $command .= "--volume=\Q$ENV{JOB_WORK}\E ";
+
while (my ($env_key, $env_val) = each %ENV)
{
- if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
- if ($env_key eq "TASK_WORK") {
- $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
- }
- elsif ($env_key eq "TASK_KEEPMOUNT") {
- $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
- }
- else {
- $command .= "--env=\Q$env_key=$env_val\E ";
- }
+ if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
+ $command .= "--env=\Q$env_key=$env_val\E ";
}
}
- $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
- $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
+ $command .= "--env=\QHOME=$ENV{HOME}\E ";
$command .= "\Q$docker_hash\E ";
$command .= "stdbuf --output=0 --error=0 ";
- $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
+ $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
} else {
# Non-docker run
$command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
my @execargs = ('bash', '-c', $command);
srun (\@srunargs, \@execargs, undef, $build_script_to_send);
# exec() failed, we assume nothing happened.
- Log(undef, "srun() failed on build script");
- die;
+ die "srun() failed on build script\n";
}
close("writer");
if (!defined $childpid)
while (my $manifest_line = <$orig_manifest>) {
$orig_manifest_text .= $manifest_line;
}
- my $output = retry_op(sub {
- $arv->{'collections'}->{'create'}->execute(
- 'collection' => {'manifest_text' => $orig_manifest_text});
- });
+ my $output = api_call("collections/create", collection => {
+ 'manifest_text' => $orig_manifest_text});
Log(undef, "output uuid " . $output->{uuid});
Log(undef, "output hash " . $output->{portable_data_hash});
$Job->update_attributes('output' => $output->{portable_data_hash});
my $newtask_list = [];
my $newtask_results;
do {
- $newtask_results = retry_op(sub {
- $arv->{'job_tasks'}->{'list'}->execute(
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
- });
+ $newtask_results = api_call(
+ "job_tasks/list",
+ 'where' => {
+ 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
push(@$newtask_list, @{$newtask_results->{items}});
} while (@{$newtask_results->{items}});
foreach my $arvados_task (@$newtask_list) {
my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
if (@stat && $stat[9] > $latest_refresh) {
$latest_refresh = scalar time;
- my $Job2 = retry_op(sub {
- $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
- });
+ my $Job2 = api_call("jobs/get", uuid => $jobspec);
for my $attr ('cancelled_at',
'cancelled_by_user_uuid',
'cancelled_by_client_uuid',
my $opts = shift || {};
my $stdin = shift;
my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
- print STDERR (join (" ",
- map { / / ? "'$_'" : $_ }
- (@$args)),
- "\n")
- if $ENV{CRUNCH_DEBUG};
+
+ $Data::Dumper::Terse = 1;
+ $Data::Dumper::Indent = 0;
+ my $show_cmd = Dumper($args);
+ $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
+ $show_cmd =~ s/\n/ /g;
+ warn "starting: $show_cmd\n";
if (defined $stdin) {
my $child = open STDIN, "-|";
# If not, return undef for both values.
my $locator = shift;
my ($streamname, $filename);
- my $image = retry_op(sub {
- $arv->{collections}->{get}->execute(uuid => $locator);
- });
+ my $image = api_call("collections/get", uuid => $locator);
if ($image) {
foreach my $line (split(/\n/, $image->{manifest_text})) {
my @tokens = split(/\s+/, $line);
}
sub retry_op {
- # Given a function reference, call it with the remaining arguments. If
- # it dies, retry it with exponential backoff until it succeeds, or until
- # the current retry_count is exhausted.
+ # Pass in two function references.
+ # This method will be called with the remaining arguments.
+ # If it dies, retry it with exponential backoff until it succeeds,
+ # or until the current retry_count is exhausted. After each failure
+ # that can be retried, the second function will be called with
+ # the current try count (0-based), next try time, and error message.
my $operation = shift;
+ my $retry_callback = shift;
my $retries = retry_count();
foreach my $try_count (0..$retries) {
my $next_try = time + (2 ** $try_count);
if (!$@) {
return $result;
} elsif ($try_count < $retries) {
+ $retry_callback->($try_count, $next_try, $@);
my $sleep_time = $next_try - time;
sleep($sleep_time) if ($sleep_time > 0);
}
die($@ . "\n");
}
+sub api_call {
+ # Pass in a /-separated API method name, and arguments for it.
+ # This function will call that method, retrying as needed until
+ # the current retry_count is exhausted, with a log on the first failure.
+ my $method_name = shift;
+ my $log_api_retry = sub {
+ my ($try_count, $next_try_at, $errmsg) = @_;
+ $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+ $errmsg =~ s/\s/ /g;
+ $errmsg =~ s/\s+$//;
+ my $retry_msg;
+ if ($next_try_at < time) {
+ $retry_msg = "Retrying.";
+ } else {
+ my $next_try_fmt = strftime("%Y-%m-%d %H:%M:%S", $next_try_at);
+ $retry_msg = "Retrying at $next_try_fmt.";
+ }
+ Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
+ };
+ my $method = $arv;
+ foreach my $key (split(/\//, $method_name)) {
+ $method = $method->{$key};
+ }
+ return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+}
+
sub exit_status_s {
# Given a $?, return a human-readable exit code string like "0" or
# "1" or "0 with signal 1" or "1 with signal 11".
# checkout-and-build
use Fcntl ':flock';
-use File::Path qw( make_path );
+use File::Path qw( make_path remove_tree );
my $destdir = $ENV{"CRUNCH_SRC"};
my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
my $task_work = $ENV{"TASK_WORK"};
for my $dir ($destdir, $task_work) {
- if ($dir) {
- make_path $dir;
- -e $dir or die "Failed to create temporary directory ($dir): $!";
- }
+ if ($dir) {
+ make_path $dir;
+ -e $dir or die "Failed to create temporary directory ($dir): $!";
+ }
+}
+
+if ($task_work) {
+ remove_tree($task_work, {keep_root => 1});
}
+
open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
flock L, LOCK_EX;
if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
}
unlink "$destdir.commit";
+open STDERR_ORIG, ">&STDERR";
open STDOUT, ">", "$destdir.log";
open STDERR, ">&STDOUT";
if ($ENV{"DEBUG"}) {
print STDERR "@_\n";
}
- system (@_) == 0
- or die "@_ failed: $! exit 0x".sprintf("%x",$?);
+ if (system (@_) != 0) {
+ my $err = $!;
+ my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
+ open STDERR, ">&STDERR_ORIG";
+ system ("cat $destdir.log >&2");
+ die "@_ failed ($err): $exitstatus";
+ }
}
__DATA__