must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
}
-
-my $build_script;
-do {
- local $/ = undef;
- $build_script = <DATA>;
-};
+my $build_script = handle_readall(\*DATA);
my $nodelist = join(",", @node);
+my $git_tar_count = 0;
if (!defined $no_clear_tmp) {
# Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
Log (undef, "Cleanup command exited ".exit_status_s($?));
}
+# If this job requires a Docker image, install that.
+my $docker_bin = "/usr/bin/docker.io";
+my ($docker_locator, $docker_stream, $docker_hash);
+if ($docker_locator = $Job->{docker_image_locator}) {
+ ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
+ if (!$docker_hash)
+ {
+ croak("No Docker image hash found from locator $docker_locator");
+ }
+ $docker_stream =~ s/^\.//;
+ my $docker_install_script = qq{
+if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+ arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+fi
+};
+ my $docker_pid = fork();
+ if ($docker_pid == 0)
+ {
+ srun (["srun", "--nodelist=" . join(',', @node)],
+ ["/bin/sh", "-ec", $docker_install_script]);
+ exit ($?);
+ }
+ while (1)
+ {
+ last if $docker_pid == waitpid (-1, WNOHANG);
+ freeze_if_want_freeze ($docker_pid);
+ select (undef, undef, undef, 0.1);
+ }
+ if ($? != 0)
+ {
+ croak("Installing Docker image from $docker_locator exited "
+ .exit_status_s($?));
+ }
+
+ if ($Job->{arvados_sdk_version}) {
+ # The job also specifies an Arvados SDK version. Add the SDKs to the
+ # tar file for the build script to install.
+ add_git_archive("git", "--git-dir=$git_dir", "archive",
+ "--prefix=.arvados.sdk/",
+ $Job->{arvados_sdk_version}, "sdk");
+ }
+}
-my $git_archive;
if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
# If script_version looks like an absolute path, *and* the --git-dir
# argument was not given -- which implies we were not invoked by
}
$ENV{"CRUNCH_SRC_COMMIT"} = $commit;
- $git_archive = `$gitcmd archive ''\Q$commit\E`;
- if ($?) {
- croak("Error: $gitcmd archive exited ".exit_status_s($?));
- }
+ add_git_archive("$gitcmd archive ''\Q$commit\E");
}
+my $git_archive = combined_git_archive();
if (!defined $git_archive) {
Log(undef, "Skip install phase (no git archive)");
if ($have_slurm) {
}
my $install_exited = $?;
Log (undef, "Install script exited ".exit_status_s($install_exited));
- exit (1) if $install_exited != 0;
-}
-
-if (!$have_slurm)
-{
- # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
- must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
-}
-
-# If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
-if ($docker_locator = $Job->{docker_image_locator}) {
- ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
- if (!$docker_hash)
- {
- croak("No Docker image hash found from locator $docker_locator");
- }
- $docker_stream =~ s/^\.//;
- my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
- arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
-fi
-};
- my $docker_pid = fork();
- if ($docker_pid == 0)
- {
- srun (["srun", "--nodelist=" . join(',', @node)],
- ["/bin/sh", "-ec", $docker_install_script]);
- exit ($?);
- }
- while (1)
- {
- last if $docker_pid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($docker_pid);
- select (undef, undef, undef, 0.1);
- }
- if ($? != 0)
- {
- croak("Installing Docker image from $docker_locator exited "
- .exit_status_s($?));
+ foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
+ unlink($tar_filename);
}
+ exit (1) if $install_exited != 0;
}
foreach (qw (script script_version script_parameters runtime_constraints))
return $s;
}
+sub handle_readall {
+ # Pass in a glob reference to a file handle.
+ # Read all its contents and return them as a string.
+ my $fh_glob_ref = shift;
+ local $/ = undef;
+ return <$fh_glob_ref>;
+}
+
+sub tar_filename_n {
+ my $n = shift;
+ return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
+}
+
+sub add_git_archive {
+ # Pass in a git archive command as a string or list, a la system().
+ # This method will save its output to be included in the archive sent to the
+ # build script.
+ my $git_input;
+ $git_tar_count++;
+ if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
+ croak("Failed to save git archive: $!");
+ }
+ my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
+ close($git_input);
+ waitpid($git_pid, 0);
+ close(GIT_ARCHIVE);
+ if ($?) {
+ croak("Failed to save git archive: git exited " . exit_status_s($?));
+ }
+}
+
+sub combined_git_archive {
+ # Combine all saved tar archives into a single archive, then return its
+ # contents in a string. Return undef if no archives have been saved.
+ if ($git_tar_count < 1) {
+ return undef;
+ }
+ my $base_tar_name = tar_filename_n(1);
+ foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
+ my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
+ if ($tar_exit != 0) {
+ croak("Error preparing build archive: tar -A exited " .
+ exit_status_s($tar_exit));
+ }
+ }
+ if (!open(GIT_TAR, "<", $base_tar_name)) {
+ croak("Could not open build archive: $!");
+ }
+ my $tar_contents = handle_readall(\*GIT_TAR);
+ close(GIT_TAR);
+ return $tar_contents;
+}
+
__DATA__
#!/usr/bin/perl
-
-# checkout-and-build
+#
+# This is crunch-job's internal dispatch script. crunch-job running on the API
+# server invokes this script on individual compute nodes, or localhost if we're
+# running a job locally. It gets called in two modes:
+#
+# * No arguments: Installation mode. Read a tar archive from the DATA
+# file handle; it includes the Crunch script's source code, and
+# maybe SDKs as well. Those should be installed in the proper
+# locations.
+#
+# * With arguments: Crunch script run mode. This script should set up the
+# environment, then run the command specified in the arguments.
use Fcntl ':flock';
use File::Path qw( make_path remove_tree );
+use POSIX qw(getcwd);
+
+# Map SDK subdirectories to the path environments they belong to.
+my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
my $destdir = $ENV{"CRUNCH_SRC"};
my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
my $repo = $ENV{"CRUNCH_SRC_URL"};
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
my $job_work = $ENV{"JOB_WORK"};
my $task_work = $ENV{"TASK_WORK"};
remove_tree($task_work, {keep_root => 1});
}
-my @git_archive_data = <DATA>;
-if (!@git_archive_data) {
- # Nothing to extract -> nothing to install.
- run_argv_and_exit();
+if (@ARGV) {
+ if (-e "$install_dir/bin/activate") {
+ my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
+ @ARGV = ("/bin/sh", "-ec",
+ ". \Q$install_dir/bin/activate\E; exec $orig_argv");
+ }
+ while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
+ my $sdk_path = "$install_dir/$sdk_dir";
+ if (-d $sdk_path) {
+ if ($ENV{$sdk_envkey}) {
+ $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
+ } else {
+ $ENV{$sdk_envkey} = $sdk_path;
+ }
+ }
+ }
+ exec(@ARGV);
+ die "Cannot exec `@ARGV`: $!";
}
open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
flock L, LOCK_EX;
if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
# This version already installed -> nothing to do.
- run_argv_and_exit();
+ exit(0);
}
unlink "$destdir.commit";
open STDERR, ">&STDOUT";
mkdir $destdir;
-open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-print TARX @git_archive_data;
+open TARX, "|-", "tar", "-xC", $destdir;
+{
+ local $/ = undef;
+ print TARX <DATA>;
+}
if(!close(TARX)) {
- die "'tar -C $destdir -xf -' exited $?: $!";
+ die "'tar -xC $destdir' exited $?: $!";
}
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
mkdir $install_dir;
-for my $src_path ("$destdir/arvados/sdk/python") {
- if (-d $src_path) {
- shell_or_die ("virtualenv", $install_dir);
- shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+my $sdk_root = "$destdir/.arvados.sdk/sdk";
+if (-d $sdk_root) {
+ if (can_run("virtualenv")) {
+ shell_or_die("virtualenv", "--python=python2.7", "--system-site-packages",
+ $install_dir);
+ shell_or_die("$install_dir/bin/pip", "install", "$sdk_root/python");
+ }
+
+ foreach my $sdk_lang (map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS)) {
+ if (-d "$sdk_root/$sdk_lang") {
+ if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
+ die "Failed to install $sdk_lang SDK: $!";
+ }
+ }
}
}
close L;
-run_argv_and_exit();
-
-sub run_argv_and_exit
-{
- if (@ARGV) {
- exec(@ARGV);
- die "Cannot exec `@ARGV`: $!";
- } else {
- exit 0;
- }
+sub can_run {
+ my $command_name = shift;
+ open(my $which, "-|", "which", $command_name);
+ while (<$which>) { }
+ close($which);
+ return ($? == 0);
}
sub shell_or_die
class Dispatcher
include ApplicationHelper
+ def initialize
+ @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
+ if @crunch_job_bin.empty?
+ raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
+ end
+
+ @arvados_internal = Rails.configuration.git_internal_dir
+ if not File.exists? @arvados_internal
+ $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
+ raise "No internal git repository available" unless ($? == 0)
+ end
+
+ @repo_root = Rails.configuration.git_repositories_dir
+ @authorizations = {}
+ @did_recently = {}
+ @fetched_commits = {}
+ @git_tags = {}
+ @node_state = {}
+ @pipe_auth_tokens = {}
+ @running = {}
+ @todo = []
+ @todo_pipelines = []
+ end
+
def sysuser
return act_as_system_user
end
def refresh_todo
- @todo = []
if $options[:jobs]
@todo = Job.queue.select(&:repository)
end
- @todo_pipelines = []
if $options[:pipelines]
@todo_pipelines = PipelineInstance.queue
end
def update_node_status
return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
- @node_state ||= {}
slurm_status.each_pair do |hostname, slurmdata|
next if @node_state[hostname] == slurmdata
begin
end
end
+ def git_cmd(cmd_s)
+ "git --git-dir=#{@arvados_internal.shellescape} #{cmd_s}"
+ end
+
+ def get_authorization(job)
+ if @authorizations[job.uuid] and
+ @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
+ # We already made a token for this job, but we need a new one
+ # because modified_by_user_uuid has changed (the job will run
+ # as a different user).
+ @authorizations[job.uuid].update_attributes expires_at: Time.now
+ @authorizations[job.uuid] = nil
+ end
+ if not @authorizations[job.uuid]
+ auth = ApiClientAuthorization.
+ new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
+ api_client_id: 0)
+ if not auth.save
+ $stderr.puts "dispatch: auth.save failed for #{job.uuid}"
+ else
+ @authorizations[job.uuid] = auth
+ end
+ end
+ @authorizations[job.uuid]
+ end
+
+ def get_commit(repo_name, commit_hash)
+ # @fetched_commits[V]==true if we know commit V exists in the
+ # arvados_internal git repository.
+ if !@fetched_commits[commit_hash]
+ src_repo = File.join(@repo_root, repo_name + '.git')
+ if not File.exists? src_repo
+ src_repo = File.join(@repo_root, repo_name, '.git')
+ if not File.exists? src_repo
+ fail_job job, "No #{repo_name}.git or #{repo_name}/.git at #{@repo_root}"
+ return nil
+ end
+ end
+
+ # check if the commit needs to be fetched or not
+ commit_rev = `#{git_cmd("rev-list -n1 #{commit_hash.shellescape} 2>/dev/null")}`.chomp
+ unless $? == 0 and commit_rev == commit_hash
+ # commit does not exist in internal repository, so import the source repository using git fetch-pack
+ cmd = git_cmd("fetch-pack --no-progress --all #{src_repo.shellescape}")
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts `#{cmd}`
+ unless $? == 0
+ fail_job job, "git fetch-pack failed"
+ return nil
+ end
+ end
+ @fetched_commits[commit_hash] = true
+ end
+ @fetched_commits[commit_hash]
+ end
+
+ def tag_commit(commit_hash, tag_name)
+ # @git_tags[T]==V if we know commit V has been tagged T in the
+ # arvados_internal repository.
+ if not @git_tags[tag_name]
+ cmd = git_cmd("tag #{tag_name.shellescape} #{commit_hash.shellescape} 2>/dev/null")
+ $stderr.puts "dispatch: #{cmd}"
+ $stderr.puts `#{cmd}`
+ unless $? == 0
+ # git tag failed. This may be because the tag already exists, so check for that.
+ tag_rev = `#{git_cmd("rev-list -n1 #{tag_name.shellescape}")}`.chomp
+ if $? == 0
+ # We got a revision back
+ if tag_rev != commit_hash
+ # Uh oh, the tag doesn't point to the revision we were expecting.
+ # Someone has been monkeying with the job record and/or git.
+ fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}"
+ return nil
+ end
+ # we're okay (fall through to setting @git_tags below)
+ else
+ # git rev-list failed for some reason.
+ fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'"
+ return nil
+ end
+ end
+ # 'git tag' was successful, or there is an existing tag that points to the same revision.
+ @git_tags[tag_name] = commit_hash
+ elsif @git_tags[tag_name] != commit_hash
+ fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}"
+ return nil
+ end
+ @git_tags[tag_name]
+ end
+
def start_jobs
@todo.each do |job|
next if @running[job.uuid]
"GEM_PATH=#{ENV['GEM_PATH']}")
end
- @authorizations ||= {}
- if @authorizations[job.uuid] and
- @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
- # We already made a token for this job, but we need a new one
- # because modified_by_user_uuid has changed (the job will run
- # as a different user).
- @authorizations[job.uuid].update_attributes expires_at: Time.now
- @authorizations[job.uuid] = nil
- end
- if not @authorizations[job.uuid]
- auth = ApiClientAuthorization.
- new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
- api_client_id: 0)
- if not auth.save
- $stderr.puts "dispatch: auth.save failed"
- next
- end
- @authorizations[job.uuid] = auth
- end
-
- crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
- if crunch_job_bin == ''
- raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
- end
-
- arvados_internal = Rails.configuration.git_internal_dir
- if not File.exists? arvados_internal
- $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare`
- end
-
- git = "git --git-dir=#{arvados_internal.shellescape}"
-
- # @fetched_commits[V]==true if we know commit V exists in the
- # arvados_internal git repository.
- @fetched_commits ||= {}
- if !@fetched_commits[job.script_version]
-
- repo_root = Rails.configuration.git_repositories_dir
- src_repo = File.join(repo_root, job.repository + '.git')
- if not File.exists? src_repo
- src_repo = File.join(repo_root, job.repository, '.git')
- if not File.exists? src_repo
- fail_job job, "No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
- next
- end
- end
-
- # check if the commit needs to be fetched or not
- commit_rev = `#{git} rev-list -n1 #{job.script_version.shellescape} 2>/dev/null`.chomp
- unless $? == 0 and commit_rev == job.script_version
- # commit does not exist in internal repository, so import the source repository using git fetch-pack
- cmd = "#{git} fetch-pack --no-progress --all #{src_repo.shellescape}"
- $stderr.puts "dispatch: #{cmd}"
- $stderr.puts `#{cmd}`
- unless $? == 0
- fail_job job, "git fetch-pack failed"
- next
- end
- end
- @fetched_commits[job.script_version] = true
- end
-
- # @job_tags[J]==V if we know commit V has been tagged J in the
- # arvados_internal repository. (J is a job UUID, V is a commit
- # sha1.)
- @job_tags ||= {}
- if not @job_tags[job.uuid]
- cmd = "#{git} tag #{job.uuid.shellescape} #{job.script_version.shellescape} 2>/dev/null"
- $stderr.puts "dispatch: #{cmd}"
- $stderr.puts `#{cmd}`
- unless $? == 0
- # git tag failed. This may be because the tag already exists, so check for that.
- tag_rev = `#{git} rev-list -n1 #{job.uuid.shellescape}`.chomp
- if $? == 0
- # We got a revision back
- if tag_rev != job.script_version
- # Uh oh, the tag doesn't point to the revision we were expecting.
- # Someone has been monkeying with the job record and/or git.
- fail_job job, "Existing tag #{job.uuid} points to commit #{tag_rev} but expected commit #{job.script_version}"
- next
- end
- # we're okay (fall through to setting @job_tags below)
- else
- # git rev-list failed for some reason.
- fail_job job, "'git tag' for #{job.uuid} failed but did not find any existing tag using 'git rev-list'"
- next
- end
- end
- # 'git tag' was successful, or there is an existing tag that points to the same revision.
- @job_tags[job.uuid] = job.script_version
- elsif @job_tags[job.uuid] != job.script_version
- fail_job job, "Existing tag #{job.uuid} points to commit #{@job_tags[job.uuid]} but this job uses commit #{job.script_version}"
- next
+ ready = (get_authorization(job) and
+ get_commit(job.repository, job.script_version) and
+ tag_commit(job.script_version, job.uuid))
+ if ready and job.arvados_sdk_version
+ ready = (get_commit("arvados", job.arvados_sdk_version) and
+ tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
end
+ next unless ready
- cmd_args << crunch_job_bin
- cmd_args << '--job-api-token'
- cmd_args << @authorizations[job.uuid].api_token
- cmd_args << '--job'
- cmd_args << job.uuid
- cmd_args << '--git-dir'
- cmd_args << arvados_internal
+ cmd_args += [@crunch_job_bin,
+ '--job-api-token', @authorizations[job.uuid].api_token,
+ '--job', job.uuid,
+ '--git-dir', @arvados_internal]
$stderr.puts "dispatch: #{cmd_args.join ' '}"
def run
act_as_system_user
- @running ||= {}
- @pipe_auth_tokens ||= { }
$stderr.puts "dispatch: ready"
while !$signal[:term] or @running.size > 0
read_pipes
protected
def did_recently(thing, min_interval)
- @did_recently ||= {}
if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
@did_recently[thing] = Time.now
false