4027: Crunch installs jobs' requested arvados_sdk_version.
[arvados.git] / services / api / script / crunch-dispatch.rb
index d83485edba0351414f313b51c5c186de2f56eded..ab4f70e60bd1115a87a47eeed64a544859bf49fd 100755 (executable)
@@ -53,16 +53,38 @@ end
 class Dispatcher
   include ApplicationHelper
 
+  def initialize
+    @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
+    if @crunch_job_bin.empty?
+      raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
+    end
+
+    @arvados_internal = Rails.configuration.git_internal_dir
+    if not File.exists? @arvados_internal
+      $stderr.puts `mkdir -p #{@arvados_internal.shellescape} && git init --bare #{@arvados_internal.shellescape}`
+      raise "No internal git repository available" unless ($? == 0)
+    end
+
+    @repo_root = Rails.configuration.git_repositories_dir
+    @authorizations = {}
+    @did_recently = {}
+    @fetched_commits = {}
+    @git_tags = {}
+    @node_state = {}
+    @pipe_auth_tokens = {}
+    @running = {}
+    @todo = []
+    @todo_pipelines = []
+  end
+
   def sysuser
     return act_as_system_user
   end
 
   def refresh_todo
-    @todo = []
     if $options[:jobs]
       @todo = Job.queue.select(&:repository)
     end
-    @todo_pipelines = []
     if $options[:pipelines]
       @todo_pipelines = PipelineInstance.queue
     end
@@ -114,7 +136,6 @@ class Dispatcher
 
   def update_node_status
     return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
-    @node_state ||= {}
     slurm_status.each_pair do |hostname, slurmdata|
       next if @node_state[hostname] == slurmdata
       begin
@@ -223,6 +244,103 @@ class Dispatcher
     end
   end
 
+  def stdout_s(cmd_a, opts={})
+    IO.popen(cmd_a, "r", opts) do |pipe|
+      return pipe.read.chomp
+    end
+  end
+
+  def git_cmd(*cmd_a)
+    ["git", "--git-dir=#{@arvados_internal}"] + cmd_a
+  end
+
+  def get_authorization(job)
+    if @authorizations[job.uuid] and
+        @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
+      # We already made a token for this job, but we need a new one
+      # because modified_by_user_uuid has changed (the job will run
+      # as a different user).
+      @authorizations[job.uuid].update_attributes expires_at: Time.now
+      @authorizations[job.uuid] = nil
+    end
+    if not @authorizations[job.uuid]
+      auth = ApiClientAuthorization.
+        new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
+            api_client_id: 0)
+      if not auth.save
+        $stderr.puts "dispatch: auth.save failed for #{job.uuid}"
+      else
+        @authorizations[job.uuid] = auth
+      end
+    end
+    @authorizations[job.uuid]
+  end
+
+  def get_commit(repo_name, commit_hash)
+    # @fetched_commits[V]==true if we know commit V exists in the
+    # arvados_internal git repository.
+    if !@fetched_commits[commit_hash]
+      src_repo = File.join(@repo_root, "#{repo_name}.git")
+      if not File.exists? src_repo
+        src_repo = File.join(@repo_root, repo_name, '.git')
+        if not File.exists? src_repo
+          fail_job job, "No #{repo_name}.git or #{repo_name}/.git at #{@repo_root}"
+          return nil
+        end
+      end
+
+      # check if the commit needs to be fetched or not
+      commit_rev = stdout_s(git_cmd("rev-list", "-n1", commit_hash),
+                            err: "/dev/null")
+      unless $? == 0 and commit_rev == commit_hash
+        # commit does not exist in internal repository, so import the source repository using git fetch-pack
+        cmd = git_cmd("fetch-pack", "--no-progress", "--all", src_repo)
+        $stderr.puts "dispatch: #{cmd}"
+        $stderr.puts(stdout_s(cmd))
+        unless $? == 0
+          fail_job job, "git fetch-pack failed"
+          return nil
+        end
+      end
+      @fetched_commits[commit_hash] = true
+    end
+    @fetched_commits[commit_hash]
+  end
+
+  def tag_commit(commit_hash, tag_name)
+    # @git_tags[T]==V if we know commit V has been tagged T in the
+    # arvados_internal repository.
+    if not @git_tags[tag_name]
+      cmd = git_cmd("tag", tag_name, commit_hash)
+      $stderr.puts "dispatch: #{cmd}"
+      $stderr.puts(stdout_s(cmd, err: "/dev/null"))
+      unless $? == 0
+        # git tag failed.  This may be because the tag already exists, so check for that.
+        tag_rev = stdout_s(git_cmd("rev-list", "-n1", tag_name))
+        if $? == 0
+          # We got a revision back
+          if tag_rev != commit_hash
+            # Uh oh, the tag doesn't point to the revision we were expecting.
+            # Someone has been monkeying with the job record and/or git.
+            fail_job job, "Existing tag #{tag_name} points to commit #{tag_rev} but expected commit #{commit_hash}"
+            return nil
+          end
+          # we're okay (fall through to setting @git_tags below)
+        else
+          # git rev-list failed for some reason.
+          fail_job job, "'git tag' for #{tag_name} failed but did not find any existing tag using 'git rev-list'"
+          return nil
+        end
+      end
+      # 'git tag' was successful, or there is an existing tag that points to the same revision.
+      @git_tags[tag_name] = commit_hash
+    elsif @git_tags[tag_name] != commit_hash
+      fail_job job, "Existing tag #{tag_name} points to commit #{@git_tags[tag_name]} but this job uses commit #{commit_hash}"
+      return nil
+    end
+    @git_tags[tag_name]
+  end
+
   def start_jobs
     @todo.each do |job|
       next if @running[job.uuid]
@@ -265,108 +383,19 @@ class Dispatcher
                          "GEM_PATH=#{ENV['GEM_PATH']}")
       end
 
-      @authorizations ||= {}
-      if @authorizations[job.uuid] and
-          @authorizations[job.uuid].user.uuid != job.modified_by_user_uuid
-        # We already made a token for this job, but we need a new one
-        # because modified_by_user_uuid has changed (the job will run
-        # as a different user).
-        @authorizations[job.uuid].update_attributes expires_at: Time.now
-        @authorizations[job.uuid] = nil
-      end
-      if not @authorizations[job.uuid]
-        auth = ApiClientAuthorization.
-          new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
-              api_client_id: 0)
-        if not auth.save
-          $stderr.puts "dispatch: auth.save failed"
-          next
-        end
-        @authorizations[job.uuid] = auth
-      end
-
-      crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
-      if crunch_job_bin == ''
-        raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
-      end
-
-      arvados_internal = Rails.configuration.git_internal_dir
-      if not File.exists? arvados_internal
-        $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare`
-      end
-
-      git = "git --git-dir=#{arvados_internal.shellescape}"
-
-      # @fetched_commits[V]==true if we know commit V exists in the
-      # arvados_internal git repository.
-      @fetched_commits ||= {}
-      if !@fetched_commits[job.script_version]
-
-        repo_root = Rails.configuration.git_repositories_dir
-        src_repo = File.join(repo_root, job.repository + '.git')
-        if not File.exists? src_repo
-          src_repo = File.join(repo_root, job.repository, '.git')
-          if not File.exists? src_repo
-            fail_job job, "No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
-            next
-          end
-        end
-
-        # check if the commit needs to be fetched or not
-        commit_rev = `#{git} rev-list -n1 #{job.script_version.shellescape} 2>/dev/null`.chomp
-        unless $? == 0 and commit_rev == job.script_version
-          # commit does not exist in internal repository, so import the source repository using git fetch-pack
-          cmd = "#{git} fetch-pack --no-progress --all #{src_repo.shellescape}"
-          $stderr.puts "dispatch: #{cmd}"
-          $stderr.puts `#{cmd}`
-          unless $? == 0
-            fail_job job, "git fetch-pack failed"
-            next
-          end
-        end
-        @fetched_commits[job.script_version] = true
-      end
-
-      # @job_tags[J]==V if we know commit V has been tagged J in the
-      # arvados_internal repository. (J is a job UUID, V is a commit
-      # sha1.)
-      @job_tags ||= {}
-      if not @job_tags[job.uuid]
-        cmd = "#{git} tag #{job.uuid.shellescape} #{job.script_version.shellescape} 2>/dev/null"
-        $stderr.puts "dispatch: #{cmd}"
-        $stderr.puts `#{cmd}`
-        unless $? == 0
-          # git tag failed.  This may be because the tag already exists, so check for that.
-          tag_rev = `#{git} rev-list -n1 #{job.uuid.shellescape}`.chomp
-          if $? == 0
-            # We got a revision back
-            if tag_rev != job.script_version
-              # Uh oh, the tag doesn't point to the revision we were expecting.
-              # Someone has been monkeying with the job record and/or git.
-              fail_job job, "Existing tag #{job.uuid} points to commit #{tag_rev} but expected commit #{job.script_version}"
-              next
-            end
-            # we're okay (fall through to setting @job_tags below)
-          else
-            # git rev-list failed for some reason.
-            fail_job job, "'git tag' for #{job.uuid} failed but did not find any existing tag using 'git rev-list'"
-            next
-          end
-        end
-        # 'git tag' was successful, or there is an existing tag that points to the same revision.
-        @job_tags[job.uuid] = job.script_version
-      elsif @job_tags[job.uuid] != job.script_version
-        fail_job job, "Existing tag #{job.uuid} points to commit #{@job_tags[job.uuid]} but this job uses commit #{job.script_version}"
-        next
+      ready = (get_authorization(job) and
+               get_commit(job.repository, job.script_version) and
+               tag_commit(job.script_version, job.uuid))
+      if ready and job.arvados_sdk_version
+        ready = (get_commit("arvados", job.arvados_sdk_version) and
+                 tag_commit(job.arvados_sdk_version, "#{job.uuid}-arvados-sdk"))
       end
+      next unless ready
 
-      cmd_args << crunch_job_bin
-      cmd_args << '--job-api-token'
-      cmd_args << @authorizations[job.uuid].api_token
-      cmd_args << '--job'
-      cmd_args << job.uuid
-      cmd_args << '--git-dir'
-      cmd_args << arvados_internal
+      cmd_args += [@crunch_job_bin,
+                   '--job-api-token', @authorizations[job.uuid].api_token,
+                   '--job', job.uuid,
+                   '--git-dir', @arvados_internal]
 
       $stderr.puts "dispatch: #{cmd_args.join ' '}"
 
@@ -658,8 +687,6 @@ class Dispatcher
 
   def run
     act_as_system_user
-    @running ||= {}
-    @pipe_auth_tokens ||= { }
     $stderr.puts "dispatch: ready"
     while !$signal[:term] or @running.size > 0
       read_pipes
@@ -694,7 +721,6 @@ class Dispatcher
   protected
 
   def did_recently(thing, min_interval)
-    @did_recently ||= {}
     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
       @did_recently[thing] = Time.now
       false