4027: Crunch installs jobs' requested arvados_sdk_version.
authorBrett Smith <brett@curoverse.com>
Mon, 24 Nov 2014 21:55:38 +0000 (16:55 -0500)
committerBrett Smith <brett@curoverse.com>
Mon, 24 Nov 2014 21:56:39 +0000 (16:56 -0500)
* crunch-dispatch fetches the requested SDK version into its internal
  git repository, just like it does for the Crunch script.  Refactored
  crunch-dispatch to make that code reusable.

* crunch-job's main script archives the sdk subdirectory as of that
  commit, sending it along to compute nodes in the same .tar as the
  Crunch script, under .arvados.sdk.

* crunch-job's __DATA__ dispatch section looks for the SDK under
  .arvados.sdk, and installs it as much as possible.

Since I was messing with it so much already, I changed the semantics
of crunch-job's __DATA__ section: it is now either in installation
mode or run mode, based on whether there's anything in @ARGV.  I
confirmed that this is consistent with current calls to the section.

sdk/cli/bin/crunch-job
services/api/script/crunch-dispatch.rb

index 0d35d53f9d2b924ea8b583fda5b5a3a682be09fb..8528dd7a3dfde4bc55716afbb07cff184bc0ffb7 100755 (executable)
@@ -335,13 +335,9 @@ if (!$have_slurm)
   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
 }
 
-
-my $build_script;
-do {
-  local $/ = undef;
-  $build_script = <DATA>;
-};
+my $build_script = handle_readall(\*DATA);
 my $nodelist = join(",", @node);
+my $git_tar_count = 0;
 
 if (!defined $no_clear_tmp) {
   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
@@ -363,8 +359,49 @@ if (!defined $no_clear_tmp) {
   Log (undef, "Cleanup command exited ".exit_status_s($?));
 }
 
+# If this job requires a Docker image, install that.
+my $docker_bin = "/usr/bin/docker.io";
+my ($docker_locator, $docker_stream, $docker_hash);
+if ($docker_locator = $Job->{docker_image_locator}) {
+  ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
+  if (!$docker_hash)
+  {
+    croak("No Docker image hash found from locator $docker_locator");
+  }
+  $docker_stream =~ s/^\.//;
+  my $docker_install_script = qq{
+if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
+    arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
+fi
+};
+  my $docker_pid = fork();
+  if ($docker_pid == 0)
+  {
+    srun (["srun", "--nodelist=" . join(',', @node)],
+          ["/bin/sh", "-ec", $docker_install_script]);
+    exit ($?);
+  }
+  while (1)
+  {
+    last if $docker_pid == waitpid (-1, WNOHANG);
+    freeze_if_want_freeze ($docker_pid);
+    select (undef, undef, undef, 0.1);
+  }
+  if ($? != 0)
+  {
+    croak("Installing Docker image from $docker_locator exited "
+          .exit_status_s($?));
+  }
+
+  if ($Job->{arvados_sdk_version}) {
+    # The job also specifies an Arvados SDK version.  Add the SDKs to the
+    # tar file for the build script to install.
+    add_git_archive("git", "--git-dir=$git_dir", "archive",
+                    "--prefix=.arvados.sdk/",
+                    $Job->{arvados_sdk_version}, "sdk");
+  }
+}
 
-my $git_archive;
 if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) {
   # If script_version looks like an absolute path, *and* the --git-dir
   # argument was not given -- which implies we were not invoked by
@@ -518,12 +555,10 @@ else {
   }
 
   $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
-  $git_archive = `$gitcmd archive ''\Q$commit\E`;
-  if ($?) {
-    croak("Error: $gitcmd archive exited ".exit_status_s($?));
-  }
+  add_git_archive("$gitcmd archive ''\Q$commit\E");
 }
 
+my $git_archive = combined_git_archive();
 if (!defined $git_archive) {
   Log(undef, "Skip install phase (no git archive)");
   if ($have_slurm) {
@@ -553,48 +588,10 @@ else {
   }
   my $install_exited = $?;
   Log (undef, "Install script exited ".exit_status_s($install_exited));
-  exit (1) if $install_exited != 0;
-}
-
-if (!$have_slurm)
-{
-  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
-  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
-}
-
-# If this job requires a Docker image, install that.
-my $docker_bin = "/usr/bin/docker.io";
-my ($docker_locator, $docker_stream, $docker_hash);
-if ($docker_locator = $Job->{docker_image_locator}) {
-  ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
-  if (!$docker_hash)
-  {
-    croak("No Docker image hash found from locator $docker_locator");
-  }
-  $docker_stream =~ s/^\.//;
-  my $docker_install_script = qq{
-if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
-    arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
-fi
-};
-  my $docker_pid = fork();
-  if ($docker_pid == 0)
-  {
-    srun (["srun", "--nodelist=" . join(',', @node)],
-          ["/bin/sh", "-ec", $docker_install_script]);
-    exit ($?);
-  }
-  while (1)
-  {
-    last if $docker_pid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($docker_pid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($? != 0)
-  {
-    croak("Installing Docker image from $docker_locator exited "
-          .exit_status_s($?));
+  foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) {
+    unlink($tar_filename);
   }
+  exit (1) if $install_exited != 0;
 }
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -1728,17 +1725,85 @@ sub exit_status_s {
   return $s;
 }
 
+sub handle_readall {
+  # Pass in a glob reference to a file handle.
+  # Read all its contents and return them as a string.
+  my $fh_glob_ref = shift;
+  local $/ = undef;
+  return <$fh_glob_ref>;
+}
+
+sub tar_filename_n {
+  my $n = shift;
+  return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n);
+}
+
+sub add_git_archive {
+  # Pass in a git archive command as a string or list, a la system().
+  # This method will save its output to be included in the archive sent to the
+  # build script.
+  my $git_input;
+  $git_tar_count++;
+  if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) {
+    croak("Failed to save git archive: $!");
+  }
+  my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_);
+  close($git_input);
+  waitpid($git_pid, 0);
+  close(GIT_ARCHIVE);
+  if ($?) {
+    croak("Failed to save git archive: git exited " . exit_status_s($?));
+  }
+}
+
+sub combined_git_archive {
+  # Combine all saved tar archives into a single archive, then return its
+  # contents in a string.  Return undef if no archives have been saved.
+  if ($git_tar_count < 1) {
+    return undef;
+  }
+  my $base_tar_name = tar_filename_n(1);
+  foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) {
+    my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append);
+    if ($tar_exit != 0) {
+      croak("Error preparing build archive: tar -A exited " .
+            exit_status_s($tar_exit));
+    }
+  }
+  if (!open(GIT_TAR, "<", $base_tar_name)) {
+    croak("Could not open build archive: $!");
+  }
+  my $tar_contents = handle_readall(\*GIT_TAR);
+  close(GIT_TAR);
+  return $tar_contents;
+}
+
 __DATA__
 #!/usr/bin/perl
-
-# checkout-and-build
+#
+# This is crunch-job's internal dispatch script.  crunch-job running on the API
+# server invokes this script on individual compute nodes, or localhost if we're
+# running a job locally.  It gets called in two modes:
+#
+# * No arguments: Installation mode.  Read a tar archive from the DATA
+#   file handle; it includes the Crunch script's source code, and
+#   maybe SDKs as well.  Those should be installed in the proper
+#   locations.
+#
+# * With arguments: Crunch script run mode.  This script should set up the
+#   environment, then run the command specified in the arguments.
 
 use Fcntl ':flock';
 use File::Path qw( make_path remove_tree );
+use POSIX qw(getcwd);
+
+# Map SDK subdirectories to the path environments they belong to.
+my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB");
 
 my $destdir = $ENV{"CRUNCH_SRC"};
 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
 my $repo = $ENV{"CRUNCH_SRC_URL"};
+my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt");
 my $job_work = $ENV{"JOB_WORK"};
 my $task_work = $ENV{"TASK_WORK"};
 
@@ -1753,17 +1818,31 @@ if ($task_work) {
   remove_tree($task_work, {keep_root => 1});
 }
 
-my @git_archive_data = <DATA>;
-if (!@git_archive_data) {
-  # Nothing to extract -> nothing to install.
-  run_argv_and_exit();
+if (@ARGV) {
+  if (-e "$install_dir/bin/activate") {
+    my $orig_argv = join(" ", map { quotemeta($_); } @ARGV);
+    @ARGV = ("/bin/sh", "-ec",
+             ". \Q$install_dir/bin/activate\E; exec $orig_argv");
+  }
+  while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) {
+    my $sdk_path = "$install_dir/$sdk_dir";
+    if (-d $sdk_path) {
+      if ($ENV{$sdk_envkey}) {
+        $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey};
+      } else {
+        $ENV{$sdk_envkey} = $sdk_path;
+      }
+    }
+  }
+  exec(@ARGV);
+  die "Cannot exec `@ARGV`: $!";
 }
 
 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
 flock L, LOCK_EX;
 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
   # This version already installed -> nothing to do.
-  run_argv_and_exit();
+  exit(0);
 }
 
 unlink "$destdir.commit";
@@ -1772,21 +1851,31 @@ open STDOUT, ">", "$destdir.log";
 open STDERR, ">&STDOUT";
 
 mkdir $destdir;
-open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
-print TARX @git_archive_data;
+open TARX, "|-", "tar", "-xC", $destdir;
+{
+  local $/ = undef;
+  print TARX <DATA>;
+}
 if(!close(TARX)) {
-  die "'tar -C $destdir -xf -' exited $?: $!";
+  die "'tar -xC $destdir' exited $?: $!";
 }
 
-my $pwd;
-chomp ($pwd = `pwd`);
-my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
 mkdir $install_dir;
 
-for my $src_path ("$destdir/arvados/sdk/python") {
-  if (-d $src_path) {
-    shell_or_die ("virtualenv", $install_dir);
-    shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
+my $sdk_root = "$destdir/.arvados.sdk/sdk";
+if (-d $sdk_root) {
+  if (can_run("virtualenv")) {
+    shell_or_die("virtualenv", "--python=python2.7", "--system-site-packages",
+                 $install_dir);
+    shell_or_die("$install_dir/bin/pip", "install", "$sdk_root/python");
+  }
+
+  foreach my $sdk_lang (map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS)) {
+    if (-d "$sdk_root/$sdk_lang") {
+      if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) {
+        die "Failed to install $sdk_lang SDK: $!";
+      }
+    }
   }
 }
 
@@ -1807,16 +1896,12 @@ if ($commit) {
 
 close L;
 
-run_argv_and_exit();
-
-sub run_argv_and_exit
-{
-  if (@ARGV) {
-    exec(@ARGV);
-    die "Cannot exec `@ARGV`: $!";
-  } else {
-    exit 0;
-  }
+sub can_run {
+  my $command_name = shift;
+  open(my $which, "-|", "which", $command_name);
+  while (<$which>) { }
+  close($which);
+  return ($? == 0);
 }
 
 sub shell_or_die
index ebd5165669f3d396bfa35e266a2e217ef4285ee5..697c85d008036d6b402fcb5ca0cda258cc11a95d 100755 (executable)
@@ -47,16 +47,38 @@ require 'open3'
 class Dispatcher
   include ApplicationHelper
 
+  def initialize
+    @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
+    if @crunch_job_bin.empty?
+      raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
+    end
+
+    @arvados_internal = Rails.configuration.git_internal_dir
+    if not File.exists? @arvados_internal
+      $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
+      raise "No internal git repository available" unless ($? == 0)
+    end
+
+    @repo_root = Rails.configuration.git_repositories_dir
+    @authorizations = {}
+    @did_recently = {}
+    @fetched_commits = {}
+    @git_tags = {}
+    @node_state = {}
+    @pipe_auth_tokens = {}
+    @running = {}
+    @todo = []
+    @todo_pipelines = []
+  end
+
   def sysuser
     return act_as_system_user
   end
 
   def refresh_todo
-    @todo = []
     if $options[:jobs]
       @todo = Job.queue.select(&:repository)
     end
-    @todo_pipelines = []
     if $options[:pipelines]
       @todo_pipelines = PipelineInstance.queue
     end
@@ -108,7 +130,6 @@ class Dispatcher
 
   def update_node_status
     return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
-    @node_state ||= {}
     slurm_status.each_pair do |hostname, slurmdata|
       next if @node_state[hostname] == slurmdata
       begin
@@ -217,6 +238,96 @@ class Dispatcher
     end
   end
 
+  def git_cmd(cmd_s)
+    "git --git-dir=#{@arvados_internal.shellescape} #{cmd_s}"
+  end
+
+  def get_authorization(job)
+    if @authorizations[job.uuid] and
+        @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
+      # We already made a token for this job, but we need a new one
+      # because modified_by_user_uuid has changed (the job will run
+      # as a different user).
+      @authorizations[job.uuid].update_attributes expires_at: Time.now
+      @authorizations[job.uuid] = nil
+    end
+    if not @authorizations[job.uuid]
+      auth = ApiClientAuthorization.
+        new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
+            api_client_id: 0)
+      if not auth.save
+        $stderr.puts "dispatch: auth.save failed for #{job.uuid}"
+      else
+        @authorizations[job.uuid] = auth
+      end
+    end
+    @authorizations[job.uuid]
+  end
+
+  def get_commit(repo_name, commit_hash)
+    # @fetched_commits[V]==true if we know commit V exists in the
+    # arvados_internal git repository.
+    if !@fetched_commits[commit_hash]
+      src_repo = File.join(@repo_root, repo_name + '.git')
+      if not File.exists? src_repo
+        src_repo = File.join(@repo_root, repo_name, '.git')
+        if not File.exists? src_repo
+          fail_job job, "No #{repo_name}.git or #{repo_name}/.git at #{@repo_root}"
+          return nil
+        end
+      end
+
+      # check if the commit needs to be fetched or not
+      commit_rev = `#{git_cmd("rev-list -n1 #{commit_hash.shellescape} 2>/dev/null")}`.chomp
+      unless $? == 0 and commit_rev == commit_hash
+        # commit does not exist in internal repository, so import the source repository using git fetch-pack
+        cmd = git_cmd("fetch-pack --no-progress --all #{src_repo.shellescape}")
+        $stderr.puts "dispatch: #{cmd}"
+        $stderr.puts `#{cmd}`
+        unless $? == 0
+          fail_job job, "git fetch-pack failed"
+          return nil
+        end
+      end
+      @fetched_commits[commit_hash] = true
+    end
+    @fetched_commits[commit_hash]
+  end
+
+  def tag_commit(commit_hash, tag_name)
+    # @git_tags[T]==V if we know commit V has been tagged T in the
+    # arvados_internal repository.
+    if not @git_tags[tag_name]
+      cmd = git_cmd("tag #{tag_name.shellescape} #{commit_hash.shellescape} 2>/dev/null")
+      $stderr.puts "dispatch: #{cmd}"
+      $stderr.puts `#{cmd}`
+      unless $? == 0
+        # git tag failed.  This may be because the tag already exists, so check for that.
+        tag_rev = `#{git_cmd("rev-list -n1 #{tag_name.shellescape}")}`.chomp
+        if $? == 0
+          # We got a revision back
+          if tag_rev != commit_hash
+            # Uh oh, the tag doesn't point to the revision we were expecting.
+            # Someone has been monkeying with the job record and/or git.
+            fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}"
+            return nil
+          end
+          # we're okay (fall through to setting @git_tags below)
+        else
+          # git rev-list failed for some reason.
+          fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'"
+          return nil
+        end
+      end
+      # 'git tag' was successful, or there is an existing tag that points to the same revision.
+      @git_tags[tag_name] = commit_hash
+    elsif @git_tags[tag_name] != commit_hash
+      fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}"
+      return nil
+    end
+    @git_tags[tag_name]
+  end
+
   def start_jobs
     @todo.each do |job|
       next if @running[job.uuid]
@@ -259,108 +370,19 @@ class Dispatcher
                          "GEM_PATH=#{ENV['GEM_PATH']}")
       end
 
-      @authorizations ||= {}
-      if @authorizations[job.uuid] and
-          @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
-        # We already made a token for this job, but we need a new one
-        # because modified_by_user_uuid has changed (the job will run
-        # as a different user).
-        @authorizations[job.uuid].update_attributes expires_at: Time.now
-        @authorizations[job.uuid] = nil
-      end
-      if not @authorizations[job.uuid]
-        auth = ApiClientAuthorization.
-          new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
-              api_client_id: 0)
-        if not auth.save
-          $stderr.puts "dispatch: auth.save failed"
-          next
-        end
-        @authorizations[job.uuid] = auth
-      end
-
-      crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
-      if crunch_job_bin == ''
-        raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
-      end
-
-      arvados_internal = Rails.configuration.git_internal_dir
-      if not File.exists? arvados_internal
-        $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare`
-      end
-
-      git = "git --git-dir=#{arvados_internal.shellescape}"
-
-      # @fetched_commits[V]==true if we know commit V exists in the
-      # arvados_internal git repository.
-      @fetched_commits ||= {}
-      if !@fetched_commits[job.script_version]
-
-        repo_root = Rails.configuration.git_repositories_dir
-        src_repo = File.join(repo_root, job.repository + '.git')
-        if not File.exists? src_repo
-          src_repo = File.join(repo_root, job.repository, '.git')
-          if not File.exists? src_repo
-            fail_job job, "No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
-            next
-          end
-        end
-
-        # check if the commit needs to be fetched or not
-        commit_rev = `#{git} rev-list -n1 #{job.script_version.shellescape} 2>/dev/null`.chomp
-        unless $? == 0 and commit_rev == job.script_version
-          # commit does not exist in internal repository, so import the source repository using git fetch-pack
-          cmd = "#{git} fetch-pack --no-progress --all #{src_repo.shellescape}"
-          $stderr.puts "dispatch: #{cmd}"
-          $stderr.puts `#{cmd}`
-          unless $? == 0
-            fail_job job, "git fetch-pack failed"
-            next
-          end
-        end
-        @fetched_commits[job.script_version] = true
-      end
-
-      # @job_tags[J]==V if we know commit V has been tagged J in the
-      # arvados_internal repository. (J is a job UUID, V is a commit
-      # sha1.)
-      @job_tags ||= {}
-      if not @job_tags[job.uuid]
-        cmd = "#{git} tag #{job.uuid.shellescape} #{job.script_version.shellescape} 2>/dev/null"
-        $stderr.puts "dispatch: #{cmd}"
-        $stderr.puts `#{cmd}`
-        unless $? == 0
-          # git tag failed.  This may be because the tag already exists, so check for that.
-          tag_rev = `#{git} rev-list -n1 #{job.uuid.shellescape}`.chomp
-          if $? == 0
-            # We got a revision back
-            if tag_rev != job.script_version
-              # Uh oh, the tag doesn't point to the revision we were expecting.
-              # Someone has been monkeying with the job record and/or git.
-              fail_job job, "Existing tag #{job.uuid} points to commit #{tag_rev} but expected commit #{job.script_version}"
-              next
-            end
-            # we're okay (fall through to setting @job_tags below)
-          else
-            # git rev-list failed for some reason.
-            fail_job job, "'git tag' for #{job.uuid} failed but did not find any existing tag using 'git rev-list'"
-            next
-          end
-        end
-        # 'git tag' was successful, or there is an existing tag that points to the same revision.
-        @job_tags[job.uuid] = job.script_version
-      elsif @job_tags[job.uuid] != job.script_version
-        fail_job job, "Existing tag #{job.uuid} points to commit #{@job_tags[job.uuid]} but this job uses commit #{job.script_version}"
-        next
+      ready = (get_authorization(job) and
+               get_commit(job.repository, job.script_version) and
+               tag_commit(job.script_version, job.uuid))
+      if ready and job.arvados_sdk_version
+        ready = (get_commit("arvados", job.arvados_sdk_version) and
+                 tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
       end
+      next unless ready
 
-      cmd_args << crunch_job_bin
-      cmd_args << '--job-api-token'
-      cmd_args << @authorizations[job.uuid].api_token
-      cmd_args << '--job'
-      cmd_args << job.uuid
-      cmd_args << '--git-dir'
-      cmd_args << arvados_internal
+      cmd_args += [@crunch_job_bin,
+                   '--job-api-token', @authorizations[job.uuid].api_token,
+                   '--job', job.uuid,
+                   '--git-dir', @arvados_internal]
 
       $stderr.puts "dispatch: #{cmd_args.join ' '}"
 
@@ -652,8 +674,6 @@ class Dispatcher
 
   def run
     act_as_system_user
-    @running ||= {}
-    @pipe_auth_tokens ||= { }
     $stderr.puts "dispatch: ready"
     while !$signal[:term] or @running.size > 0
       read_pipes
@@ -688,7 +708,6 @@ class Dispatcher
   protected
 
   def did_recently(thing, min_interval)
-    @did_recently ||= {}
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
       @did_recently[thing] = Time.now
       false