Merge branch '10649-squeue-stderr' closes #10649
[arvados.git] / services / api / app / models / job.rb
index 41a16bfd5da64bcf6604c4dfae83723068bd40f5..ef3d0b5e1028b41a2bab4d63aa300efb4743bc0a 100644 (file)
@@ -2,6 +2,8 @@ class Job < ArvadosModel
   include HasUuid
   include KindAndEtag
   include CommonApiTemplate
+  extend CurrentApiClient
+  serialize :components, Hash
   attr_protected :arvados_sdk_version, :docker_image_locator
   serialize :script_parameters, Hash
   serialize :runtime_constraints, Hash
@@ -10,9 +12,10 @@ class Job < ArvadosModel
   after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update
   before_validation :set_priority
   before_validation :update_state_from_old_state_attrs
+  before_validation :update_script_parameters_digest
   validate :ensure_script_version_is_commit
-  validate :find_arvados_sdk_version
   validate :find_docker_image_locator
+  validate :find_arvados_sdk_version
   validate :validate_status
   validate :validate_state_change
   validate :ensure_no_collection_uuids_in_script_params
@@ -52,6 +55,7 @@ class Job < ArvadosModel
     t.add :queue_position
     t.add :node_uuids
     t.add :description
+    t.add :components
   end
 
   # Supported states for a job
@@ -63,6 +67,10 @@ class Job < ArvadosModel
             (Complete = 'Complete'),
            ]
 
+  after_initialize do
+    @need_crunch_dispatch_trigger = false
+  end
+
   def assert_finished
     update_attributes(finished_at: finished_at || db_current_time,
                       success: success.nil? ? false : success,
@@ -78,12 +86,13 @@ class Job < ArvadosModel
   end
 
   def queue_position
-    Job::queue.each_with_index do |job, index|
-      if job[:uuid] == self.uuid
-        return index
-      end
-    end
-    nil
+    # We used to report this accurately, but the implementation made queue
+    # API requests O(n**2) for the size of the queue.  See #8800.
+    # We've soft-disabled it because it's not clear we even want this
+    # functionality: now that we have Node Manager with support for multiple
+    # node sizes, "queue position" tells you very little about when a job will
+    # run.
+    state == Queued ? 0 : nil
   end
 
   def self.running
@@ -92,8 +101,7 @@ class Job < ArvadosModel
   end
 
   def lock locked_by_uuid
-    transaction do
-      self.reload
+    with_lock do
       unless self.state == Queued and self.is_locked_by_uuid.nil?
         raise AlreadyLockedError
       end
@@ -103,8 +111,181 @@ class Job < ArvadosModel
     end
   end
 
+  def update_script_parameters_digest
+    self.script_parameters_digest = self.class.sorted_hash_digest(script_parameters)
+  end
+
+  def self.searchable_columns operator
+    super - ["script_parameters_digest"]
+  end
+
+  def self.load_job_specific_filters attrs, orig_filters, read_users
+    # Convert Job-specific @filters entries into general SQL filters.
+    script_info = {"repository" => nil, "script" => nil}
+    git_filters = Hash.new do |hash, key|
+      hash[key] = {"max_version" => "HEAD", "exclude_versions" => []}
+    end
+    filters = []
+    orig_filters.each do |attr, operator, operand|
+      if (script_info.has_key? attr) and (operator == "=")
+        if script_info[attr].nil?
+          script_info[attr] = operand
+        elsif script_info[attr] != operand
+          raise ArgumentError.new("incompatible #{attr} filters")
+        end
+      end
+      case operator
+      when "in git"
+        git_filters[attr]["min_version"] = operand
+      when "not in git"
+        git_filters[attr]["exclude_versions"] += Array.wrap(operand)
+      when "in docker", "not in docker"
+        image_hashes = Array.wrap(operand).flat_map do |search_term|
+          image_search, image_tag = search_term.split(':', 2)
+          Collection.
+            find_all_for_docker_image(image_search, image_tag, read_users).
+            map(&:portable_data_hash)
+        end
+        filters << [attr, operator.sub(/ docker$/, ""), image_hashes]
+      else
+        filters << [attr, operator, operand]
+      end
+    end
+
+    # Build a real script_version filter from any "not? in git" filters.
+    git_filters.each_pair do |attr, filter|
+      case attr
+      when "script_version"
+        script_info.each_pair do |key, value|
+          if value.nil?
+            raise ArgumentError.new("script_version filter needs #{key} filter")
+          end
+        end
+        filter["repository"] = script_info["repository"]
+        if attrs[:script_version]
+          filter["max_version"] = attrs[:script_version]
+        else
+          # Using HEAD, set earlier by the hash default, is fine.
+        end
+      when "arvados_sdk_version"
+        filter["repository"] = "arvados"
+      else
+        raise ArgumentError.new("unknown attribute for git filter: #{attr}")
+      end
+      revisions = Commit.find_commit_range(filter["repository"],
+                                           filter["min_version"],
+                                           filter["max_version"],
+                                           filter["exclude_versions"])
+      if revisions.empty?
+        raise ArgumentError.
+          new("error searching #{filter['repository']} from " +
+              "'#{filter['min_version']}' to '#{filter['max_version']}', " +
+              "excluding #{filter['exclude_versions']}")
+      end
+      filters.append([attr, "in", revisions])
+    end
+
+    filters
+  end
+
+  def self.find_reusable attrs, params, filters, read_users
+    if filters.empty?  # Translate older creation parameters into filters.
+      filters =
+        [["repository", "=", attrs[:repository]],
+         ["script", "=", attrs[:script]],
+         ["script_version", "not in git", params[:exclude_script_versions]],
+        ].reject { |filter| filter.last.nil? or filter.last.empty? }
+      if !params[:minimum_script_version].blank?
+        filters << ["script_version", "in git",
+                     params[:minimum_script_version]]
+      else
+        filters += default_git_filters("script_version", attrs[:repository],
+                                       attrs[:script_version])
+      end
+      if image_search = attrs[:runtime_constraints].andand["docker_image"]
+        if image_tag = attrs[:runtime_constraints]["docker_image_tag"]
+          image_search += ":#{image_tag}"
+        end
+        image_locator = Collection.
+          for_latest_docker_image(image_search).andand.portable_data_hash
+      else
+        image_locator = nil
+      end
+      filters << ["docker_image_locator", "=", image_locator]
+      if sdk_version = attrs[:runtime_constraints].andand["arvados_sdk_version"]
+        filters += default_git_filters("arvados_sdk_version", "arvados", sdk_version)
+      end
+      filters = load_job_specific_filters(attrs, filters, read_users)
+    end
+
+    # Check specified filters for some reasonableness.
+    filter_names = filters.map { |f| f.first }.uniq
+    ["repository", "script"].each do |req_filter|
+      if not filter_names.include?(req_filter)
+        return send_error("#{req_filter} filter required")
+      end
+    end
+
+    # Search for a reusable Job, and return it if found.
+    candidates = Job.
+      readable_by(current_user).
+      where('state = ? or (owner_uuid = ? and state in (?))',
+            Job::Complete, current_user.uuid, [Job::Queued, Job::Running]).
+      where('script_parameters_digest = ?', Job.sorted_hash_digest(attrs[:script_parameters])).
+      where('nondeterministic is distinct from ?', true).
+      order('state desc, created_at') # prefer Running jobs over Queued
+    candidates = apply_filters candidates, filters
+    chosen = nil
+    incomplete_job = nil
+    candidates.each do |j|
+      if j.state != Job::Complete
+        # We'll use this if we don't find a job that has completed
+        incomplete_job ||= j
+        next
+      end
+
+      if chosen == false
+        # We have already decided not to reuse any completed job
+        next
+      elsif chosen
+        if chosen.output != j.output
+          # If two matching jobs produced different outputs, run a new
+          # job (or use one that's already running/queued) instead of
+          # choosing one arbitrarily.
+          chosen = false
+        end
+        # ...and that's the only thing we need to do once we've chosen
+        # a job to reuse.
+      elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
+        # As soon as the output we will end up returning (if any) is
+        # decided, check whether it will be visible to the user; if
+        # not, any further investigation of reusable jobs is futile.
+        chosen = false
+      else
+        chosen = j
+      end
+    end
+    chosen || incomplete_job
+  end
+
+  def self.default_git_filters(attr_name, repo_name, refspec)
+    # Add a filter to @filters for `attr_name` = the latest commit available
+    # in `repo_name` at `refspec`.  No filter is added if refspec can't be
+    # resolved.
+    commits = Commit.find_commit_range(repo_name, nil, refspec, nil)
+    if commit_hash = commits.first
+      [[attr_name, "=", commit_hash]]
+    else
+      []
+    end
+  end
+
   protected
 
+  def self.sorted_hash_digest h
+    Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h)))
+  end
+
   def foreign_key_attributes
     super + %w(output log)
   end
@@ -125,7 +306,7 @@ class Job < ArvadosModel
   end
 
   def ensure_script_version_is_commit
-    if self.state == Running
+    if state == Running
       # Apparently client has already decided to go for it. This is
       # needed to run a local job using a local working directory
       # instead of a commit-ish.
@@ -147,16 +328,19 @@ class Job < ArvadosModel
   end
 
   def tag_version_in_internal_repository
-    if self.state == Running
+    if state == Running
       # No point now. See ensure_script_version_is_commit.
       true
+    elsif errors.any?
+      # Won't be saved, and script_version might not even be valid.
+      true
     elsif new_record? or repository_changed? or script_version_changed?
       uuid_was = uuid
       begin
         assign_uuid
         Commit.tag_in_internal_repository repository, script_version, uuid
       rescue
-        uuid = uuid_was
+        self.uuid = uuid_was
         raise
       end
     end
@@ -202,6 +386,10 @@ class Job < ArvadosModel
   end
 
   def find_docker_image_locator
+    runtime_constraints['docker_image'] =
+        Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and
+                                                              (runtime_constraints['docker_image']).nil? and
+                                                              Rails.configuration.default_docker_image_for_jobs)
     resolve_runtime_constraint("docker_image",
                                :docker_image_locator) do |image_search|
       image_tag = runtime_constraints['docker_image_tag']
@@ -231,7 +419,8 @@ class Job < ArvadosModel
           output_changed? or
           log_changed? or
           tasks_summary_changed? or
-          state_changed?
+          state_changed? or
+          components_changed?
         logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
         return false
       end
@@ -380,24 +569,6 @@ class Job < ArvadosModel
   end
 
   def ensure_no_collection_uuids_in_script_params
-    # recursive_hash_search searches recursively through hashes and
-    # arrays in 'thing' for string fields matching regular expression
-    # 'pattern'.  Returns true if pattern is found, false otherwise.
-    def recursive_hash_search thing, pattern
-      if thing.is_a? Hash
-        thing.each do |k, v|
-          return true if recursive_hash_search v, pattern
-        end
-      elsif thing.is_a? Array
-        thing.each do |k|
-          return true if recursive_hash_search k, pattern
-        end
-      elsif thing.is_a? String
-        return true if thing.match pattern
-      end
-      false
-    end
-
     # Fail validation if any script_parameters field includes a string containing a
     # collection uuid pattern.
     if self.script_parameters_changed?
@@ -408,4 +579,22 @@ class Job < ArvadosModel
     end
     true
   end
+
+  # recursive_hash_search searches recursively through hashes and
+  # arrays in 'thing' for string fields matching regular expression
+  # 'pattern'.  Returns true if pattern is found, false otherwise.
+  def recursive_hash_search thing, pattern
+    if thing.is_a? Hash
+      thing.each do |k, v|
+        return true if recursive_hash_search v, pattern
+      end
+    elsif thing.is_a? Array
+      thing.each do |k|
+        return true if recursive_hash_search k, pattern
+      end
+    elsif thing.is_a? String
+      return true if thing.match pattern
+    end
+    false
+  end
 end