11345: Clamp retry-after to (0, max_retry_wait). Deindent retry_wrapper a bit for...
[arvados.git] / services / api / app / models / job.rb
index 248d16a4ef5c8fbb8924b2c286369468db453495..fa38ece244afb52b2c2159f8cad6950f6b16468c 100644 (file)
@@ -1,7 +1,12 @@
+require 'log_reuse_info'
+require 'safe_json'
+
 class Job < ArvadosModel
   include HasUuid
   include KindAndEtag
   include CommonApiTemplate
+  extend CurrentApiClient
+  extend LogReuseInfo
   serialize :components, Hash
   attr_protected :arvados_sdk_version, :docker_image_locator
   serialize :script_parameters, Hash
@@ -66,6 +71,14 @@ class Job < ArvadosModel
             (Complete = 'Complete'),
            ]
 
+  after_initialize do
+    @need_crunch_dispatch_trigger = false
+  end
+
+  def self.limit_index_columns_read
+    ["components"]
+  end
+
   def assert_finished
     update_attributes(finished_at: finished_at || db_current_time,
                       success: success.nil? ? false : success,
@@ -114,6 +127,10 @@ class Job < ArvadosModel
     super - ["script_parameters_digest"]
   end
 
+  def self.full_text_searchable_columns
+    super - ["script_parameters_digest"]
+  end
+
   def self.load_job_specific_filters attrs, orig_filters, read_users
     # Convert Job-specific @filters entries into general SQL filters.
     script_info = {"repository" => nil, "script" => nil}
@@ -138,7 +155,7 @@ class Job < ArvadosModel
         image_hashes = Array.wrap(operand).flat_map do |search_term|
           image_search, image_tag = search_term.split(':', 2)
           Collection.
-            find_all_for_docker_image(image_search, image_tag, read_users).
+            find_all_for_docker_image(image_search, image_tag, read_users, filter_compatible_format: false).
             map(&:portable_data_hash)
         end
         filters << [attr, operator.sub(/ docker$/, ""), image_hashes]
@@ -222,45 +239,83 @@ class Job < ArvadosModel
     end
 
     # Search for a reusable Job, and return it if found.
-    candidates = Job.
-      readable_by(current_user).
-      where('state = ? or (owner_uuid = ? and state in (?))',
-            Job::Complete, current_user.uuid, [Job::Queued, Job::Running]).
-      where('script_parameters_digest = ?', Job.sorted_hash_digest(attrs[:script_parameters])).
-      where('nondeterministic is distinct from ?', true).
-      order('state desc, created_at') # prefer Running jobs over Queued
+    candidates = Job.readable_by(current_user)
+    log_reuse_info { "starting with #{candidates.count} jobs readable by current user #{current_user.uuid}" }
+
+    candidates = candidates.where(
+      'state = ? or (owner_uuid = ? and state in (?))',
+      Job::Complete, current_user.uuid, [Job::Queued, Job::Running])
+    log_reuse_info(candidates) { "after filtering on job state ((state=Complete) or (state=Queued/Running and (submitted by current user)))" }
+
+    digest = Job.sorted_hash_digest(attrs[:script_parameters])
+    candidates = candidates.where('script_parameters_digest = ?', digest)
+    log_reuse_info(candidates) { "after filtering on script_parameters_digest #{digest}" }
+
+    candidates = candidates.where('nondeterministic is distinct from ?', true)
+    log_reuse_info(candidates) { "after filtering on !nondeterministic" }
+
+    # prefer Running jobs over Queued
+    candidates = candidates.order('state desc, created_at')
+
     candidates = apply_filters candidates, filters
+    log_reuse_info(candidates) { "after filtering on repo, script, and custom filters #{filters.inspect}" }
+
     chosen = nil
     incomplete_job = nil
     candidates.each do |j|
       if j.state != Job::Complete
-        # We'll use this if we don't find a job that has completed
-        incomplete_job ||= j
-        next
-      end
-
-      if chosen == false
-        # We have already decided not to reuse any completed job
-        next
+        if !incomplete_job
+          # We'll use this if we don't find a job that has completed
+          log_reuse_info { "job #{j.uuid} is reusable, but unfinished; continuing search for completed jobs" }
+          incomplete_job = j
+        else
+          log_reuse_info { "job #{j.uuid} is unfinished and we already have #{incomplete_job.uuid}; ignoring" }
+        end
+      elsif chosen == false
+        # Ignore: we have already decided not to reuse any completed
+        # job.
+        log_reuse_info { "job #{j.uuid} with output #{j.output} ignored, see above" }
+      elsif Rails.configuration.reuse_job_if_outputs_differ
+        if Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
+          log_reuse_info { "job #{j.uuid} with output #{j.output} is reusable; decision is final." }
+          return j
+        else
+          # Ignore: keep locking for an incomplete job or one whose
+          # output is readable.
+          log_reuse_info { "job #{j.uuid} output #{j.output} unavailable to user; continuing search" }
+        end
       elsif chosen
         if chosen.output != j.output
           # If two matching jobs produced different outputs, run a new
           # job (or use one that's already running/queued) instead of
           # choosing one arbitrarily.
+          log_reuse_info { "job #{j.uuid} output #{j.output} disagrees; forgetting about #{chosen.uuid} and ignoring any other finished jobs (see reuse_job_if_outputs_differ in application.default.yml)" }
           chosen = false
+        else
+          log_reuse_info { "job #{j.uuid} output #{j.output} agrees with chosen #{chosen.uuid}; continuing search in case other candidates have different outputs" }
         end
         # ...and that's the only thing we need to do once we've chosen
         # a job to reuse.
       elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output)
-        # As soon as the output we will end up returning (if any) is
-        # decided, check whether it will be visible to the user; if
-        # not, any further investigation of reusable jobs is futile.
+        # This user cannot read the output of this job. Any other
+        # completed job will have either the same output (making it
+        # unusable) or a different output (making it unusable because
+        # reuse_job_if_outputs_different is turned off). Therefore,
+        # any further investigation of reusable jobs is futile.
+        log_reuse_info { "job #{j.uuid} output #{j.output} is unavailable to user; this means no finished job can be reused (see reuse_job_if_outputs_differ in application.default.yml)" }
         chosen = false
       else
+        log_reuse_info { "job #{j.uuid} with output #{j.output} can be reused; continuing search in case other candidates have different outputs" }
         chosen = j
       end
     end
-    chosen || incomplete_job
+    j = chosen || incomplete_job
+    if j
+      log_reuse_info { "done, #{j.uuid} was selected" }
+    else
+      log_reuse_info { "done, nothing suitable" }
+    end
+    return j
   end
 
   def self.default_git_filters(attr_name, repo_name, refspec)
@@ -275,19 +330,45 @@ class Job < ArvadosModel
     end
   end
 
+  def cancel(cascade: false, need_transaction: true)
+    if need_transaction
+      ActiveRecord::Base.transaction do
+        cancel(cascade: cascade, need_transaction: false)
+      end
+      return
+    end
+
+    if self.state.in?([Queued, Running])
+      self.state = Cancelled
+      self.save!
+    elsif self.state != Cancelled
+      raise InvalidStateTransitionError
+    end
+
+    return if !cascade
+
+    # cancel all children; they could be jobs or pipeline instances
+    children = self.components.andand.collect{|_, u| u}.compact
+
+    return if children.empty?
+
+    # cancel any child jobs
+    Job.where(uuid: children, state: [Queued, Running]).each do |job|
+      job.cancel(cascade: cascade, need_transaction: false)
+    end
+
+    # cancel any child pipelines
+    PipelineInstance.where(uuid: children, state: [PipelineInstance::RunningOnServer, PipelineInstance::RunningOnClient]).each do |pi|
+      pi.cancel(cascade: cascade, need_transaction: false)
+    end
+  end
+
   protected
 
   def self.sorted_hash_digest h
     Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h)))
   end
 
-  def self.deep_sort_hash h
-    return h unless h.is_a? Hash
-    h.sort.collect do |k, v|
-      [k, deep_sort_hash(v)]
-    end.to_h
-  end
-
   def foreign_key_attributes
     super + %w(output log)
   end
@@ -342,7 +423,7 @@ class Job < ArvadosModel
         assign_uuid
         Commit.tag_in_internal_repository repository, script_version, uuid
       rescue
-        uuid = uuid_was
+        self.uuid = uuid_was
         raise
       end
     end
@@ -388,10 +469,11 @@ class Job < ArvadosModel
   end
 
   def find_docker_image_locator
-    runtime_constraints['docker_image'] =
-        Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and
-                                                              (runtime_constraints['docker_image']).nil? and
-                                                              Rails.configuration.default_docker_image_for_jobs)
+    if runtime_constraints.is_a? Hash
+      runtime_constraints['docker_image'] ||=
+        Rails.configuration.default_docker_image_for_jobs
+    end
+
     resolve_runtime_constraint("docker_image",
                                :docker_image_locator) do |image_search|
       image_tag = runtime_constraints['docker_image_tag']
@@ -421,7 +503,7 @@ class Job < ArvadosModel
           output_changed? or
           log_changed? or
           tasks_summary_changed? or
-          state_changed? or
+          (state_changed? && state != Cancelled) or
           components_changed?
         logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
         return false
@@ -571,24 +653,6 @@ class Job < ArvadosModel
   end
 
   def ensure_no_collection_uuids_in_script_params
-    # recursive_hash_search searches recursively through hashes and
-    # arrays in 'thing' for string fields matching regular expression
-    # 'pattern'.  Returns true if pattern is found, false otherwise.
-    def recursive_hash_search thing, pattern
-      if thing.is_a? Hash
-        thing.each do |k, v|
-          return true if recursive_hash_search v, pattern
-        end
-      elsif thing.is_a? Array
-        thing.each do |k|
-          return true if recursive_hash_search k, pattern
-        end
-      elsif thing.is_a? String
-        return true if thing.match pattern
-      end
-      false
-    end
-
     # Fail validation if any script_parameters field includes a string containing a
     # collection uuid pattern.
     if self.script_parameters_changed?
@@ -599,4 +663,22 @@ class Job < ArvadosModel
     end
     true
   end
+
+  # recursive_hash_search searches recursively through hashes and
+  # arrays in 'thing' for string fields matching regular expression
+  # 'pattern'.  Returns true if pattern is found, false otherwise.
+  def recursive_hash_search thing, pattern
+    if thing.is_a? Hash
+      thing.each do |k, v|
+        return true if recursive_hash_search v, pattern
+      end
+    elsif thing.is_a? Array
+      thing.each do |k|
+        return true if recursive_hash_search k, pattern
+      end
+    elsif thing.is_a? String
+      return true if thing.match pattern
+    end
+    false
+  end
 end