Obtain job details from Arvados, run tasks on compute nodes (typically
invoked by scheduler on controller):
- crunch-job --job x-y-z
+ crunch-job --job x-y-z --git-dir /path/to/repo/.git
Obtain job details from command line, run tasks on local machine
(typically invoked by application or developer on VM):
- crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
+ crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}'
+
+ crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}'
=head1 OPTIONS
=item --git-dir
-Path to .git directory where the specified commit is found.
+Path to a .git directory (or a git URL) where the commit given in the
+job's C<script_version> attribute is to be found. If this is I<not>
+given, the job's C<repository> attribute will be used.
=item --job-api-token
setup. This can speed up development and debugging when running jobs
locally.
+=item --job
+
+UUID of the job to run, or a JSON-encoded job resource without a
+UUID. If the latter is given, a new job object will be created.
+
=back
=head1 RUNNING JOBS LOCALLY
use POSIX qw(strftime);
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Arvados;
+use Cwd qw(realpath);
+use Data::Dumper;
use Digest::MD5 qw(md5_hex);
use Getopt::Long;
use IPC::Open2;
my $arv = Arvados->new('apiVersion' => 'v1');
-my $User = $arv->{'users'}->{'current'}->execute;
-
my $Job;
my $job_id;
my $dbh;
my $sth;
+my @jobstep;
+
+my $User = api_call("users/current");
+
if ($jobspec =~ /^[-a-z\d]+$/)
{
# $jobspec is an Arvados UUID, not a JSON job specification
- $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ $Job = api_call("jobs/get", uuid => $jobspec);
if (!$force_unlock) {
# Claim this job, and make sure nobody else does
- eval {
- # lock() sets is_locked_by_uuid and changes state to Running.
- $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
- };
+ eval { api_call("jobs/lock", uuid => $Job->{uuid}); };
if ($@) {
Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
exit EX_TEMPFAIL;
$Job->{'started_at'} = gmtime;
$Job->{'state'} = 'Running';
- $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
+ $Job = api_call("jobs/create", job => $Job);
}
$job_id = $Job->{'uuid'};
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
+my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`;
+if ($? == 0) {
+ $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /;
+ chomp($gem_versions);
+ chop($gem_versions); # Closing parentheses
+} else {
+ $gem_versions = "";
+}
+Log(undef,
+ "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions);
Log (undef, "check slurm allocation");
my @slot;
$ENV{"JOB_UUID"} = $job_id;
-my @jobstep;
my @jobstep_todo = ();
my @jobstep_done = ();
my @jobstep_tomerge = ();
my $jobstep_tomerge_level = 0;
my $squeue_checked;
my $squeue_kill_checked;
-my $output_in_keep = 0;
my $latest_refresh = scalar time;
}
else
{
- my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
+ my $first_task = api_call("job_tasks/create", job_task => {
'job_uuid' => $Job->{'uuid'},
'sequence' => 0,
'qsequence' => 0,
'parameters' => {},
- });
+ });
push @jobstep, { 'level' => 0,
'failures' => 0,
'arvados_task' => $first_task,
must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
}
-
-my $build_script;
-do {
- local $/ = undef;
- $build_script = <DATA>;
-};
+my $build_script = handle_readall(\*DATA);
my $nodelist = join(",", @node);
+my $git_tar_count = 0;
if (!defined $no_clear_tmp) {
# Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
my $cleanpid = fork();
if ($cleanpid == 0)
{
+ # Find FUSE mounts that look like Keep mounts (the mount path has the
+ # word "keep") and unmount them. Then clean up work directories.
+ # TODO: When #5036 is done and widely deployed, we can get rid of the
+ # regular expression and just unmount everything with type fuse.keep.
srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+ ['bash', '-ec', 'mount -t fuse,fuse.keep | awk \'($3 ~ /\ykeep\y/){print $3}\' | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']);
exit (1);
}
while (1)
Log (undef, "Cleanup command exited ".exit_status_s($?));
}
+# If this job requires a Docker image, install that.
+my $docker_bin = "/usr/bin/docker.io";
+my ($docker_locator, $docker_stream, $docker_hash);
+if ($docker_locator = $Job->{docker_image_locator}) {
+ ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
+ if (!$docker_hash)
+ {
+ croak("No Docker image hash found from locator $docker_locator");
+ }
+ $docker_stream =~ s/^\.//;
+ my $docker_install_script = qq{
+if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+ arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+fi
+};
+ my $docker_pid = fork();
+ if ($docker_pid == 0)
+ {
+ srun (["srun", "--nodelist=" . join(',', @node)],
+ ["/bin/sh", "-ec", $docker_install_script]);
+ exit ($?);
+ }
+ while (1)
+ {
+ last if $docker_pid == waitpid (-1, WNOHANG);
+ freeze_if_want_freeze ($docker_pid);
+ select (undef, undef, undef, 0.1);
+ }
+ if ($? != 0)
+ {
+ croak("Installing Docker image from $docker_locator exited "
+ .exit_status_s($?));
+ }
+
+ if ($Job->{arvados_sdk_version}) {
+ # The job also specifies an Arvados SDK version. Add the SDKs to the
+ # tar file for the build script to install.
+ Log(undef, sprintf("Packing Arvados SDK version %s for installation",
+ $Job->{arvados_sdk_version}));
+ add_git_archive("git", "--git-dir=$git_dir", "archive",
+ "--prefix=.arvados.sdk/",
+ $Job->{arvados_sdk_version}, "sdk");
+ }
+}
-my $git_archive;
if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
- # If we're in user-land (i.e., not called from crunch-dispatch)
- # script_version can be an absolute directory path, signifying we
- # should work straight out of that directory instead of using a git
- # commit.
+ # If script_version looks like an absolute path, *and* the --git-dir
+ # argument was not given -- which implies we were not invoked by
+ # crunch-dispatch -- we will use the given path as a working
+ # directory instead of resolving script_version to a git commit (or
+ # doing anything else with git).
$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'};
$ENV{"CRUNCH_SRC"} = $Job->{'script_version'};
}
else {
+ # Resolve the given script_version to a git commit sha1. Also, if
+ # the repository is remote, clone it into our local filesystem: this
+ # ensures "git archive" will work, and is necessary to reliably
+ # resolve a symbolic script_version like "master^".
$ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
- # Install requested code version
Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository});
$ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
} else {
# $repo is none of the above. It must be the name of a hosted
# repository.
- my $arv_repo_list = $arv->{'repositories'}->{'list'}->execute(
- 'filters' => [['name','=',$repo]]
- )->{'items'};
- my $n_found = scalar @{$arv_repo_list};
+ my $arv_repo_list = api_call("repositories/list",
+ 'filters' => [['name','=',$repo]]);
+ my @repos_found = @{$arv_repo_list->{'items'}};
+ my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'};
if ($n_found > 0) {
Log(undef, "Repository '$repo' -> "
- . join(", ", map { $_->{'uuid'} } @{$arv_repo_list}));
+ . join(", ", map { $_->{'uuid'} } @repos_found));
}
if ($n_found != 1) {
croak("Error: Found $n_found repositories with name '$repo'.");
}
- $repo = $arv_repo_list->[0]->{'fetch_url'};
+ $repo = $repos_found[0]->{'fetch_url'};
$repo_location = 'remote';
}
Log(undef, "Using $repo_location repository '$repo'");
# our local cache first, since that's cheaper. (We don't want to
# do that with tags/branches though -- those change over time, so
# they should always be resolved by the remote repo.)
- if ($treeish =~ /^[0-9a-f]{3,40}$/s) {
+ if ($treeish =~ /^[0-9a-f]{7,40}$/s) {
# Hide stderr because it's normal for this to fail:
my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`;
if ($? == 0 &&
- $sha1 =~ /^$treeish/ && # Don't use commit 123 @ branch abc!
+ # Careful not to resolve a branch named abcdeff to commit 1234567:
+ $sha1 =~ /^$treeish/ &&
$sha1 =~ /^([0-9a-f]{40})$/s) {
$commit = $1;
Log(undef, "Commit $commit already present in $local_repo");
}
$ENV{"CRUNCH_SRC_COMMIT"} = $commit;
- $git_archive = `$gitcmd archive ''\Q$commit\E`;
- if ($?) {
- croak("Error: $gitcmd archive exited ".exit_status_s($?));
- }
+ add_git_archive("$gitcmd archive ''\Q$commit\E");
}
+my $git_archive = combined_git_archive();
if (!defined $git_archive) {
Log(undef, "Skip install phase (no git archive)");
if ($have_slurm) {
my @execargs = ("sh", "-c",
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
- # Note: this section is almost certainly unnecessary if we're
- # running tasks in docker containers.
my $installpid = fork();
if ($installpid == 0)
{
freeze_if_want_freeze ($installpid);
select (undef, undef, undef, 0.1);
}
- Log (undef, "Install script exited ".exit_status_s($?));
-}
-
-if (!$have_slurm)
-{
- # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
- must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
-}
-
-# If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
-if ($docker_locator = $Job->{docker_image_locator}) {
- ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
- if (!$docker_hash)
- {
- croak("No Docker image hash found from locator $docker_locator");
- }
- $docker_stream =~ s/^\.//;
- my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
- arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
-fi
-};
- my $docker_pid = fork();
- if ($docker_pid == 0)
- {
- srun (["srun", "--nodelist=" . join(',', @node)],
- ["/bin/sh", "-ec", $docker_install_script]);
- exit ($?);
- }
- while (1)
- {
- last if $docker_pid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($docker_pid);
- select (undef, undef, undef, 0.1);
- }
- if ($? != 0)
- {
- croak("Installing Docker image from $docker_locator exited "
- .exit_status_s($?));
+ my $install_exited = $?;
+ Log (undef, "Install script exited ".exit_status_s($install_exited));
+ foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
+ unlink($tar_filename);
}
+ exit (1) if $install_exited != 0;
}
foreach (qw (script script_version script_parameters runtime_constraints))
}
$ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
$ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
- $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
+ $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname";
$ENV{"HOME"} = $ENV{"TASK_WORK"};
$ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
$ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
"--job-name=$job_id.$id.$$",
);
- my $build_script_to_send = "";
my $command =
"if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
."&& cd $ENV{CRUNCH_TMP} ";
- if ($build_script)
- {
- $build_script_to_send = $build_script;
- $command .=
- "&& perl -";
- }
$command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_hash)
{
- $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
- $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
+ my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid";
+ $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
+ $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+
# Dynamically configure the container to use the host system as its
# DNS server. Get the host's global addresses from the ip command,
# and turn them into docker --dns options using gawk.
$command .=
q{$(ip -o address show scope global |
gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
- $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
+
+ # The source tree and $destdir directory (which we have
+ # installed on the worker host) are available in the container,
+ # under the same path.
+ $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E ";
+ $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E ";
+
+ # Currently, we make arv-mount's mount point appear at /keep
+ # inside the container (instead of using the same path as the
+ # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However,
+ # crunch scripts and utilities must not rely on this. They must
+ # use $TASK_KEEPMOUNT.
$command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
- $command .= "--env=\QHOME=/home/crunch\E ";
+ $ENV{TASK_KEEPMOUNT} = "/keep";
+
+ # TASK_WORK is almost exactly like a docker data volume: it
+ # starts out empty, is writable, and persists until no
+ # containers use it any more. We don't use --volumes-from to
+ # share it with other containers: it is only accessible to this
+ # task, and it goes away when this task stops.
+ #
+ # However, a docker data volume is writable only by root unless
+ # the mount point already happens to exist in the container with
+ # different permissions. Therefore, we [1] assume /tmp already
+ # exists in the image and is writable by the crunch user; [2]
+ # avoid putting TASK_WORK inside CRUNCH_TMP (which won't be
+ # writable if they are created by docker while setting up the
+ # other --volumes); and [3] create $TASK_WORK inside the
+ # container using $build_script.
+ $command .= "--volume=/tmp ";
+ $ENV{"TASK_WORK"} = "/tmp/crunch-job-task-work/$childslotname";
+ $ENV{"HOME"} = $ENV{"TASK_WORK"};
+ $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
+
+ # TODO: Share a single JOB_WORK volume across all task
+ # containers on a given worker node, and delete it when the job
+ # ends (and, in case that doesn't work, when the next job
+ # starts).
+ #
+ # For now, use the same approach as TASK_WORK above.
+ $ENV{"JOB_WORK"} = "/tmp/crunch-job-work";
+
while (my ($env_key, $env_val) = each %ENV)
{
- if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
- if ($env_key eq "TASK_WORK") {
- $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
- }
- elsif ($env_key eq "TASK_KEEPMOUNT") {
- $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
- }
- else {
- $command .= "--env=\Q$env_key=$env_val\E ";
- }
+ if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) {
+ $command .= "--env=\Q$env_key=$env_val\E ";
}
}
- $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
- $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
+ $command .= "--env=\QHOME=$ENV{HOME}\E ";
$command .= "\Q$docker_hash\E ";
$command .= "stdbuf --output=0 --error=0 ";
- $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
+ $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
} else {
# Non-docker run
$command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
$command .= "stdbuf --output=0 --error=0 ";
- $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+ $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
}
my @execargs = ('bash', '-c', $command);
- srun (\@srunargs, \@execargs, undef, $build_script_to_send);
+ srun (\@srunargs, \@execargs, undef, $build_script);
# exec() failed, we assume nothing happened.
- Log(undef, "srun() failed on build script");
- die;
+ die "srun() failed on build script\n";
}
close("writer");
if (!defined $childpid)
{
$main::please_info = 0;
freeze();
- collate_output();
+ create_output_collection();
save_meta(1);
update_progress_stats();
}
$main::please_continue = 0;
goto THISROUND;
}
- $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
+ $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info;
readfrompipes ();
if (!reapchildren())
{
release_allocation();
freeze();
-my $collated_output = &collate_output();
+my $collated_output = &create_output_collection();
if (!$collated_output) {
- Log(undef, "output undef");
+ Log (undef, "Failed to write output collection");
}
else {
- eval {
- open(my $orig_manifest, '-|', 'arv-get', $collated_output)
- or die "failed to get collated manifest: $!";
- my $orig_manifest_text = '';
- while (my $manifest_line = <$orig_manifest>) {
- $orig_manifest_text .= $manifest_line;
- }
- my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
- 'manifest_text' => $orig_manifest_text,
- });
- Log(undef, "output uuid " . $output->{uuid});
- Log(undef, "output hash " . $output->{portable_data_hash});
- $Job->update_attributes('output' => $output->{portable_data_hash});
- };
- if ($@) {
- Log (undef, "Failed to register output manifest: $@");
- }
+ Log(undef, "output hash " . $collated_output);
+ $Job->update_attributes('output' => $collated_output);
}
Log (undef, "finish");
my $newtask_list = [];
my $newtask_results;
do {
- $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
+ $newtask_results = api_call(
+ "job_tasks/list",
'where' => {
'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
},
my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
if (@stat && $stat[9] > $latest_refresh) {
$latest_refresh = scalar time;
- my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
+ my $Job2 = api_call("jobs/get", uuid => $jobspec);
for my $attr ('cancelled_at',
'cancelled_by_user_uuid',
'cancelled_by_client_uuid',
return $output_block;
}
-sub collate_output
+# create_output_collections generates a new collection containing the
+# output of each successfully completed task, and returns the
+# portable_data_hash for the new collection.
+#
+sub create_output_collection
{
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
- '--retries', put_retry_count());
- my $joboutput;
+ my $pid = open2($child_out, $child_in, 'python', '-c',
+ 'import arvados; ' .
+ 'import sys; ' .
+ 'print arvados.api()' .
+ '.collections()' .
+ '.create(body={"manifest_text":sys.stdin.read()})' .
+ '.execute()["portable_data_hash"]'
+ );
+
for (@jobstep)
{
next if (!exists $_->{'arvados_task'}->{'output'} ||
my $output = $_->{'arvados_task'}->{output};
if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
{
- $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
print $child_in $output;
}
- elsif (@jobstep == 1)
- {
- $joboutput = $output;
- last;
- }
elsif (defined (my $outblock = fetch_block ($output)))
{
- $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
print $child_in $outblock;
}
else
}
$child_in->close;
- if (!defined $joboutput) {
- my $s = IO::Select->new($child_out);
- if ($s->can_read(120)) {
- sysread($child_out, $joboutput, 64 * 1024 * 1024);
- chomp($joboutput);
- # TODO: Ensure exit status == 0.
- } else {
- Log (undef, "timed out reading from 'arv-put'");
- }
+ my $joboutput;
+ my $s = IO::Select->new($child_out);
+ if ($s->can_read(120)) {
+ sysread($child_out, $joboutput, 64 * 1024 * 1024);
+ chomp($joboutput);
+ # TODO: Ensure exit status == 0.
+ } else {
+ Log (undef, "timed out while creating output collection");
}
# TODO: kill $pid instead of waiting, now that we've decided to
# ignore further output.
my $message = "@_ at $file line $line\n";
Log (undef, $message);
freeze() if @jobstep_todo;
- collate_output() if @jobstep_todo;
- cleanup() if $Job;
- save_meta() if log_writer_is_active();
+ create_output_collection() if @jobstep_todo;
+ cleanup();
+ save_meta();
die;
}
sub cleanup
{
+ return unless $Job;
if ($Job->{'state'} eq 'Cancelled') {
$Job->update_attributes('finished_at' => scalar gmtime);
} else {
{
my $justcheckpoint = shift; # false if this will be the last meta saved
return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
+ return unless log_writer_is_active();
my $loglocator = log_writer_finish();
Log (undef, "log manifest is $loglocator");
}
}
freeze();
- collate_output();
+ create_output_collection();
cleanup();
save_meta();
exit 1;
my $opts = shift || {};
my $stdin = shift;
my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
- print STDERR (join (" ",
- map { / / ? "'$_'" : $_ }
- (@$args)),
- "\n")
- if $ENV{CRUNCH_DEBUG};
+
+ $Data::Dumper::Terse = 1;
+ $Data::Dumper::Indent = 0;
+ my $show_cmd = Dumper($args);
+ $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g;
+ $show_cmd =~ s/\n/ /g;
+ warn "starting: $show_cmd\n";
if (defined $stdin) {
my $child = open STDIN, "-|";
# If not, return undef for both values.
my $locator = shift;
my ($streamname, $filename);
- if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
+ my $image = api_call("collections/get", uuid => $locator);
+ if ($image) {
foreach my $line (split(/\n/, $image->{manifest_text})) {
my @tokens = split(/\s+/, $line);
next if (!@tokens);
}
}
-sub put_retry_count {
- # Calculate a --retries argument for arv-put that will have it try
- # approximately as long as this Job has been running.
- my $stoptime = shift || time;
- my $starttime = $jobstep[0]->{starttime};
- my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
- my $retries = 0;
- while ($timediff >= 2) {
- $retries++;
- $timediff /= 2;
+sub retry_count {
+ # Calculate the number of times an operation should be retried,
+ # assuming exponential backoff, and that we're willing to retry as
+ # long as tasks have been running. Enforce a minimum of 3 retries.
+ my ($starttime, $endtime, $timediff, $retries);
+ if (@jobstep) {
+ $starttime = $jobstep[0]->{starttime};
+ $endtime = $jobstep[-1]->{finishtime};
+ }
+ if (!defined($starttime)) {
+ $timediff = 0;
+ } elsif (!defined($endtime)) {
+ $timediff = time - $starttime;
+ } else {
+ $timediff = ($endtime - $starttime) - (time - $endtime);
+ }
+ if ($timediff > 0) {
+ $retries = int(log($timediff) / log(2));
+ } else {
+ $retries = 1; # Use the minimum.
}
return ($retries > 3) ? $retries : 3;
}
+sub retry_op {
+ # Pass in two function references.
+ # This method will be called with the remaining arguments.
+ # If it dies, retry it with exponential backoff until it succeeds,
+ # or until the current retry_count is exhausted. After each failure
+ # that can be retried, the second function will be called with
+ # the current try count (0-based), next try time, and error message.
+ my $operation = shift;
+ my $retry_callback = shift;
+ my $retries = retry_count();
+ foreach my $try_count (0..$retries) {
+ my $next_try = time + (2 ** $try_count);
+ my $result = eval { $operation->(@_); };
+ if (!$@) {
+ return $result;
+ } elsif ($try_count < $retries) {
+ $retry_callback->($try_count, $next_try, $@);
+ my $sleep_time = $next_try - time;
+ sleep($sleep_time) if ($sleep_time > 0);
+ }
+ }
+ # Ensure the error message ends in a newline, so Perl doesn't add
+ # retry_op's line number to it.
+ chomp($@);
+ die($@ . "\n");
+}
+
+sub api_call {
+ # Pass in a /-separated API method name, and arguments for it.
+ # This function will call that method, retrying as needed until
+ # the current retry_count is exhausted, with a log on the first failure.
+ my $method_name = shift;
+ my $log_api_retry = sub {
+ my ($try_count, $next_try_at, $errmsg) = @_;
+ $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//;
+ $errmsg =~ s/\s/ /g;
+ $errmsg =~ s/\s+$//;
+ my $retry_msg;
+ if ($next_try_at < time) {
+ $retry_msg = "Retrying.";
+ } else {
+ my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at);
+ $retry_msg = "Retrying at $next_try_fmt.";
+ }
+ Log(undef, "API method $method_name failed: $errmsg. $retry_msg");
+ };
+ my $method = $arv;
+ foreach my $key (split(/\//, $method_name)) {
+ $method = $method->{$key};
+ }
+ return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_);
+}
+
sub exit_status_s {
# Given a $?, return a human-readable exit code string like "0" or
# "1" or "0 with signal 1" or "1 with signal 11".
return $s;
}
+sub handle_readall {
+ # Pass in a glob reference to a file handle.
+ # Read all its contents and return them as a string.
+ my $fh_glob_ref = shift;
+ local $/ = undef;
+ return <$fh_glob_ref>;
+}
+
+sub tar_filename_n {
+ my $n = shift;
+ return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
+}
+
+sub add_git_archive {
+ # Pass in a git archive command as a string or list, a la system().
+ # This method will save its output to be included in the archive sent to the
+ # build script.
+ my $git_input;
+ $git_tar_count++;
+ if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
+ croak("Failed to save git archive: $!");
+ }
+ my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
+ close($git_input);
+ waitpid($git_pid, 0);
+ close(GIT_ARCHIVE);
+ if ($?) {
+ croak("Failed to save git archive: git exited " . exit_status_s($?));
+ }
+}
+
+sub combined_git_archive {
+ # Combine all saved tar archives into a single archive, then return its
+ # contents in a string. Return undef if no archives have been saved.
+ if ($git_tar_count < 1) {
+ return undef;
+ }
+ my $base_tar_name = tar_filename_n(1);
+ foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
+ my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
+ if ($tar_exit != 0) {
+ croak("Error preparing build archive: tar -A exited " .
+ exit_status_s($tar_exit));
+ }
+ }
+ if (!open(GIT_TAR, "<", $base_tar_name)) {
+ croak("Could not open build archive: $!");
+ }
+ my $tar_contents = handle_readall(\*GIT_TAR);
+ close(GIT_TAR);
+ return $tar_contents;
+}
+
__DATA__
#!/usr/bin/perl
-
-# checkout-and-build
+#
+# This is crunch-job's internal dispatch script. crunch-job running on the API
+# server invokes this script on individual compute nodes, or localhost if we're
+# running a job locally. It gets called in two modes:
+#
+# * No arguments: Installation mode. Read a tar archive from the DATA
+# file handle; it includes the Crunch script's source code, and
+# maybe SDKs as well. Those should be installed in the proper
+# locations. This runs outside of any Docker container, so don't try to
+# introspect Crunch's runtime environment.
+#
+# * With arguments: Crunch script run mode. This script should set up the
+# environment, then run the command specified in the arguments. This runs
+# inside any Docker container.
use Fcntl ':flock';
-use File::Path qw( make_path );
+use File::Path qw( make_path remove_tree );
+use POSIX qw(getcwd);
+
+# Map SDK subdirectories to the path environments they belong to.
+my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
my $destdir = $ENV{"CRUNCH_SRC"};
my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
my $repo = $ENV{"CRUNCH_SRC_URL"};
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
+my $job_work = $ENV{"JOB_WORK"};
my $task_work = $ENV{"TASK_WORK"};
-for my $dir ($destdir, $task_work) {
- if ($dir) {
- make_path $dir;
- -e $dir or die "Failed to create temporary directory ($dir): $!";
+for my $dir ($destdir, $job_work, $task_work) {
+ if ($dir) {
+ make_path $dir;
+ -e $dir or die "Failed to create temporary directory ($dir): $!";
+ }
+}
+
+if ($task_work) {
+ remove_tree($task_work, {keep_root => 1});
+}
+
+open(STDOUT_ORIG, ">&", STDOUT);
+open(STDERR_ORIG, ">&", STDERR);
+open(STDOUT, ">>", "$destdir.log");
+open(STDERR, ">&", STDOUT);
+
+### Crunch script run mode
+if (@ARGV) {
+ # We want to do routine logging during task 0 only. This gives the user
+ # the information they need, but avoids repeating the information for every
+ # task.
+ my $Log;
+ if ($ENV{TASK_SEQUENCE} eq "0") {
+ $Log = sub {
+ my $msg = shift;
+ printf STDERR_ORIG "[Crunch] $msg\n", @_;
+ };
+ } else {
+ $Log = sub { };
+ }
+
+ my $python_src = "$install_dir/python";
+ my $venv_dir = "$job_work/.arvados.venv";
+ my $venv_built = -e "$venv_dir/bin/activate";
+ if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) {
+ shell_or_die("virtualenv", "--quiet", "--system-site-packages",
+ "--python=python2.7", $venv_dir);
+ shell_or_die("$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src);
+ $venv_built = 1;
+ $Log->("Built Python SDK virtualenv");
+ }
+
+ my $pip_bin = "pip";
+ if ($venv_built) {
+ $Log->("Running in Python SDK virtualenv");
+ $pip_bin = "$venv_dir/bin/pip";
+ my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
+ @ARGV = ("/bin/sh", "-ec",
+ ". \Q$venv_dir/bin/activate\E; exec $orig_argv");
+ } elsif (-d $python_src) {
+ $Log->("Warning: virtualenv not found inside Docker container default " .
+ "\$PATH. Can't install Python SDK.");
+ }
+
+ my $pkgs = `(\Q$pip_bin\E freeze 2>/dev/null | grep arvados) || dpkg-query --show '*arvados*'`;
+ if ($pkgs) {
+ $Log->("Using Arvados SDK:");
+ foreach my $line (split /\n/, $pkgs) {
+ $Log->($line);
+ }
+ } else {
+ $Log->("Arvados SDK packages not found");
+ }
+
+ while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
+ my $sdk_path = "$install_dir/$sdk_dir";
+ if (-d $sdk_path) {
+ if ($ENV{$sdk_envkey}) {
+ $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
+ } else {
+ $ENV{$sdk_envkey} = $sdk_path;
+ }
+ $Log->("Arvados SDK added to %s", $sdk_envkey);
}
+ }
+
+ close(STDOUT);
+ close(STDERR);
+ open(STDOUT, ">&", STDOUT_ORIG);
+ open(STDERR, ">&", STDERR_ORIG);
+ exec(@ARGV);
+ die "Cannot exec `@ARGV`: $!";
}
+### Installation mode
open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
flock L, LOCK_EX;
if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
- if (@ARGV) {
- exec(@ARGV);
- die "Cannot exec `@ARGV`: $!";
- } else {
- exit 0;
- }
+ # This version already installed -> nothing to do.
+ exit(0);
}
unlink "$destdir.commit";
-open STDOUT, ">", "$destdir.log";
-open STDERR, ">&STDOUT";
-
mkdir $destdir;
-my @git_archive_data = <DATA>;
-if (@git_archive_data) {
- open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
- print TARX @git_archive_data;
- if(!close(TARX)) {
- die "'tar -C $destdir -xf -' exited $?: $!";
- }
+open TARX, "|-", "tar", "-xC", $destdir;
+{
+ local $/ = undef;
+ print TARX <DATA>;
+}
+if(!close(TARX)) {
+ die "'tar -xC $destdir' exited $?: $!";
}
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
mkdir $install_dir;
-for my $src_path ("$destdir/arvados/sdk/python") {
- if (-d $src_path) {
- shell_or_die ("virtualenv", $install_dir);
- shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+my $sdk_root = "$destdir/.arvados.sdk/sdk";
+if (-d $sdk_root) {
+ foreach my $sdk_lang (("python",
+ map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) {
+ if (-d "$sdk_root/$sdk_lang") {
+ if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
+ die "Failed to install $sdk_lang SDK: $!";
+ }
+ }
}
}
+my $python_dir = "$install_dir/python";
+if ((-d $python_dir) and can_run("python2.7") and
+ (system("python2.7", "$python_dir/setup.py", "--quiet", "egg_info") != 0)) {
+ # egg_info failed, probably when it asked git for a build tag.
+ # Specify no build tag.
+ open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg");
+ print $pysdk_cfg "\n[egg_info]\ntag_build =\n";
+ close($pysdk_cfg);
+}
+
if (-e "$destdir/crunch_scripts/install") {
shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
} elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
close L;
-if (@ARGV) {
- exec(@ARGV);
- die "Cannot exec `@ARGV`: $!";
-} else {
- exit 0;
+sub can_run {
+ my $command_name = shift;
+ open(my $which, "-|", "which", $command_name);
+ while (<$which>) { }
+ close($which);
+ return ($? == 0);
}
sub shell_or_die
if ($ENV{"DEBUG"}) {
print STDERR "@_\n";
}
- system (@_) == 0
- or die "@_ failed: $! exit 0x".sprintf("%x",$?);
+ if (system (@_) != 0) {
+ my $err = $!;
+ my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f);
+ open STDERR, ">&STDERR_ORIG";
+ system ("cat $destdir.log >&2");
+ die "@_ failed ($err): $exitstatus";
+ }
}
__DATA__