20877: Don't retry trashed container requests
[arvados.git] / services / api / app / models / container.rb
index 8999b3e14e123b78f8ecfaaa2ea821d8fa6e3490..e918650c9ffa77aa525d8eabaad379968564e9fa 100644 (file)
@@ -5,7 +5,7 @@
 require 'log_reuse_info'
 require 'whitelist_update'
 require 'safe_json'
-require 'update_priority'
+require 'update_priorities'
 
 class Container < ArvadosModel
   include ArvadosModelUpdates
@@ -21,7 +21,9 @@ class Container < ArvadosModel
   # already know how to properly treat them.
   attribute :secret_mounts, :jsonbHash, default: {}
   attribute :runtime_status, :jsonbHash, default: {}
-  attribute :runtime_auth_scopes, :jsonbHash, default: {}
+  attribute :runtime_auth_scopes, :jsonbArray, default: []
+  attribute :output_storage_classes, :jsonbArray, default: lambda { Rails.configuration.DefaultStorageClasses }
+  attribute :output_properties, :jsonbHash, default: {}
 
   serialize :environment, Hash
   serialize :mounts, Hash
@@ -29,6 +31,7 @@ class Container < ArvadosModel
   serialize :command, Array
   serialize :scheduling_parameters, Hash
 
+  after_find :fill_container_defaults_after_find
   before_validation :fill_field_defaults, :if => :new_record?
   before_validation :set_timestamps
   before_validation :check_lock
@@ -47,8 +50,6 @@ class Container < ArvadosModel
   before_save :clear_runtime_status_when_queued
   after_save :update_cr_logs
   after_save :handle_completed
-  after_save :propagate_priority
-  after_commit { UpdatePriority.run_update_thread }
 
   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
@@ -76,6 +77,12 @@ class Container < ArvadosModel
     t.add :runtime_user_uuid
     t.add :runtime_auth_scopes
     t.add :lock_count
+    t.add :gateway_address
+    t.add :interactive_session_started
+    t.add :output_storage_classes
+    t.add :output_properties
+    t.add :cost
+    t.add :subrequests_cost
   end
 
   # Supported states for a container
@@ -101,11 +108,11 @@ class Container < ArvadosModel
   end
 
   def self.full_text_searchable_columns
-    super - ["secret_mounts", "secret_mounts_md5", "runtime_token"]
+    super - ["secret_mounts", "secret_mounts_md5", "runtime_token", "gateway_address", "output_storage_classes"]
   end
 
   def self.searchable_columns *args
-    super - ["secret_mounts_md5", "runtime_token"]
+    super - ["secret_mounts_md5", "runtime_token", "gateway_address", "output_storage_classes"]
   end
 
   def logged_attributes
@@ -122,35 +129,8 @@ class Container < ArvadosModel
   # priority of a user-submitted request is a function of
   # user-assigned priority and request creation time.
   def update_priority!
-    return if ![Queued, Locked, Running].include?(state)
-    p = ContainerRequest.
-        where('container_uuid=? and priority>0', uuid).
-        includes(:requesting_container).
-        lock(true).
-        map do |cr|
-      if cr.requesting_container
-        cr.requesting_container.priority
-      else
-        (cr.priority << 50) - (cr.created_at.to_time.to_f * 1000).to_i
-      end
-    end.max || 0
-    update_attributes!(priority: p)
-  end
-
-  def propagate_priority
-    return true unless priority_changed?
-    act_as_system_user do
-      # Update the priority of child container requests to match new
-      # priority of the parent container (ignoring requests with no
-      # container assigned, because their priority doesn't matter).
-      ContainerRequest.
-        where(requesting_container_uuid: self.uuid,
-              state: ContainerRequest::Committed).
-        where('container_uuid is not null').
-        includes(:container).
-        map(&:container).
-        map(&:update_priority!)
-    end
+    update_priorities uuid
+    reload
   end
 
   # Create a new container (or find an existing one) to satisfy the
@@ -184,7 +164,8 @@ class Container < ArvadosModel
         secret_mounts: req.secret_mounts,
         runtime_token: req.runtime_token,
         runtime_user_uuid: runtime_user.uuid,
-        runtime_auth_scopes: runtime_auth_scopes
+        runtime_auth_scopes: runtime_auth_scopes,
+        output_storage_classes: req.output_storage_classes,
       }
     end
     act_as_system_user do
@@ -207,17 +188,19 @@ class Container < ArvadosModel
   # containers are suitable).
   def self.resolve_runtime_constraints(runtime_constraints)
     rc = {}
-    defaults = {
-      'keep_cache_ram' =>
-      Rails.configuration.Containers.DefaultKeepCacheRAM,
-    }
-    defaults.merge(runtime_constraints).each do |k, v|
+    runtime_constraints.each do |k, v|
       if v.is_a? Array
         rc[k] = v[0]
       else
         rc[k] = v
       end
     end
+    if rc['keep_cache_ram'] == 0
+      rc['keep_cache_ram'] = Rails.configuration.Containers.DefaultKeepCacheRAM
+    end
+    if rc['keep_cache_disk'] == 0 and rc['keep_cache_ram'] == 0
+      rc['keep_cache_disk'] = bound_keep_cache_disk(rc['ram'])
+    end
     rc
   end
 
@@ -284,7 +267,55 @@ class Container < ArvadosModel
     candidates = candidates.where('secret_mounts_md5 = ?', secret_mounts_md5)
     log_reuse_info(candidates) { "after filtering on secret_mounts_md5 #{secret_mounts_md5.inspect}" }
 
-    candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]), md5: true)
+    resolved_runtime_constraints = resolve_runtime_constraints(attrs[:runtime_constraints])
+    # Ideally we would completely ignore Keep cache constraints when making
+    # reuse considerations, but our database structure makes that impractical.
+    # The best we can do is generate a search that matches on all likely values.
+    runtime_constraint_variations = {
+      keep_cache_disk: [
+        # Check for constraints without keep_cache_disk
+        # (containers that predate the constraint)
+        nil,
+        # Containers that use keep_cache_ram instead
+        0,
+        # The default value
+        bound_keep_cache_disk(resolved_runtime_constraints['ram']),
+        # The minimum default bound
+        bound_keep_cache_disk(0),
+        # The maximum default bound (presumably)
+        bound_keep_cache_disk(1 << 60),
+        # The requested value
+        resolved_runtime_constraints.delete('keep_cache_disk'),
+      ].uniq,
+      keep_cache_ram: [
+        # Containers that use keep_cache_disk instead
+        0,
+        # The default value
+        Rails.configuration.Containers.DefaultKeepCacheRAM,
+        # The requested value
+        resolved_runtime_constraints.delete('keep_cache_ram'),
+      ].uniq,
+    }
+    resolved_cuda = resolved_runtime_constraints['cuda']
+    if resolved_cuda.nil? or resolved_cuda['device_count'] == 0
+      runtime_constraint_variations[:cuda] = [
+        # Check for constraints without cuda
+        # (containers that predate the constraint)
+        nil,
+        # The default "don't need CUDA" value
+        {
+          'device_count' => 0,
+          'driver_version' => '',
+          'hardware_capability' => '',
+        },
+        # The requested value
+        resolved_runtime_constraints.delete('cuda')
+      ].uniq
+    end
+    reusable_runtime_constraints = hash_product(runtime_constraint_variations)
+                                     .map { |v| resolved_runtime_constraints.merge(v) }
+
+    candidates = candidates.where_serialized(:runtime_constraints, reusable_runtime_constraints, md5: true, multivalue: true)
     log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
 
     log_reuse_info { "checking for state=Complete with readable output and log..." }
@@ -312,7 +343,7 @@ class Container < ArvadosModel
     # Check for non-failing Running candidates and return the most likely to finish sooner.
     log_reuse_info { "checking for state=Running..." }
     running = candidates.where(state: Running).
-              where("(runtime_status->'error') is null").
+              where("(runtime_status->'error') is null and priority > 0").
               order('progress desc, started_at asc').
               limit(1).first
     if running
@@ -326,10 +357,15 @@ class Container < ArvadosModel
     locked_or_queued = candidates.
                        where("state IN (?)", [Locked, Queued]).
                        order('state asc, priority desc, created_at asc').
-                       limit(1).first
-    if locked_or_queued
-      log_reuse_info { "done, reusing container #{locked_or_queued.uuid} with state=#{locked_or_queued.state}" }
-      return locked_or_queued
+                       limit(1)
+    if !attrs[:scheduling_parameters]['preemptible']
+      locked_or_queued = locked_or_queued.
+                           where("not ((scheduling_parameters::jsonb)->>'preemptible')::boolean")
+    end
+    chosen = locked_or_queued.first
+    if chosen
+      log_reuse_info { "done, reusing container #{chosen.uuid} with state=#{chosen.state}" }
+      return chosen
     else
       log_reuse_info { "have no containers in Locked or Queued state" }
     end
@@ -387,7 +423,7 @@ class Container < ArvadosModel
     if users_list.select { |u| u.is_admin }.any?
       return super
     end
-    Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
+    Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").arel.exists)
   end
 
   def final?
@@ -409,6 +445,31 @@ class Container < ArvadosModel
 
   protected
 
+  def self.bound_keep_cache_disk(value)
+    value ||= 0
+    min_value = 2 << 30
+    max_value = 32 << 30
+    if value < min_value
+      min_value
+    elsif value > max_value
+      max_value
+    else
+      value
+    end
+  end
+
+  def self.hash_product(**kwargs)
+    # kwargs is a hash that maps parameters to an array of values.
+    # This function enumerates every possible hash where each key has one of
+    # the values from its array.
+    # The output keys are strings since that's what container hash attributes
+    # want.
+    # A nil value yields a hash without that key.
+    [[:_, nil]].product(
+      *kwargs.map { |(key, values)| [key.to_s].product(values) },
+    ).map { |param_pairs| Hash[param_pairs].compact }
+  end
+
   def fill_field_defaults
     self.state ||= Queued
     self.environment ||= {}
@@ -423,6 +484,10 @@ class Container < ArvadosModel
     current_user.andand.is_admin
   end
 
+  def permission_to_destroy
+    current_user.andand.is_admin
+  end
+
   def ensure_owner_uuid_is_permitted
     # validate_change ensures owner_uuid can't be changed at all --
     # except during create, which requires admin privileges. Checking
@@ -453,15 +518,17 @@ class Container < ArvadosModel
 
   def validate_change
     permitted = [:state]
-    progress_attrs = [:progress, :runtime_status, :log, :output]
-    final_attrs = [:exit_code, :finished_at]
+    final_attrs = [:finished_at]
+    progress_attrs = [:progress, :runtime_status, :subrequests_cost, :cost,
+                      :log, :output, :output_properties, :exit_code]
 
     if self.new_record?
       permitted.push(:owner_uuid, :command, :container_image, :cwd,
                      :environment, :mounts, :output_path, :priority,
                      :runtime_constraints, :scheduling_parameters,
                      :secret_mounts, :runtime_token,
-                     :runtime_user_uuid, :runtime_auth_scopes)
+                     :runtime_user_uuid, :runtime_auth_scopes,
+                     :output_storage_classes)
     end
 
     case self.state
@@ -472,10 +539,13 @@ class Container < ArvadosModel
       permitted.push :priority
 
     when Running
-      permitted.push :priority, *progress_attrs
+      permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
       if self.state_changed?
         permitted.push :started_at
       end
+      if !self.interactive_session_started_was
+        permitted.push :interactive_session_started
+      end
 
     when Complete
       if self.state_was == Running
@@ -487,7 +557,7 @@ class Container < ArvadosModel
       when Running
         permitted.push :finished_at, *progress_attrs
       when Queued, Locked
-        permitted.push :finished_at, :log, :runtime_status
+        permitted.push :finished_at, :log, :runtime_status, :cost
       end
 
     else
@@ -552,9 +622,9 @@ class Container < ArvadosModel
     # If self.final?, this update is superfluous: the final log/output
     # update will be done when handle_completed calls finalize! on
     # each requesting CR.
-    return if self.final? || !self.log_changed?
+    return if self.final? || !saved_change_to_log?
     leave_modified_by_user_alone do
-      ContainerRequest.where(container_uuid: self.uuid).each do |cr|
+      ContainerRequest.where(container_uuid: self.uuid, state: ContainerRequest::Committed).each do |cr|
         cr.update_collections(container: self, collections: ['log'])
         cr.save!
       end
@@ -566,8 +636,13 @@ class Container < ArvadosModel
          return errors.add :auth_uuid, 'is readonly'
     end
     if not [Locked, Running].include? self.state
-      # don't need one
-      self.auth.andand.update_attributes(expires_at: db_current_time)
+      # Don't need one. If auth already exists, expire it.
+      #
+      # We use db_transaction_time here (not db_current_time) to
+      # ensure the token doesn't validate later in the same
+      # transaction (e.g., in a test case) by satisfying expires_at >
+      # transaction timestamp.
+      self.auth.andand.update_attributes(expires_at: db_transaction_time)
       self.auth = nil
       return
     elsif self.auth
@@ -591,7 +666,8 @@ class Container < ArvadosModel
         self.runtime_auth_scopes = ["all"]
       end
 
-      # generate a new token
+      # Generate a new token. This runs with admin credentials as it's done by a
+      # dispatcher user, so expires_at isn't enforced by API.MaxTokenLifetime.
       self.auth = ApiClientAuthorization.
                     create!(user_id: User.find_by_uuid(self.runtime_user_uuid).id,
                             api_client_id: 0,
@@ -644,11 +720,11 @@ class Container < ArvadosModel
   def handle_completed
     # This container is finished so finalize any associated container requests
     # that are associated with this container.
-    if self.state_changed? and self.final?
+    if saved_change_to_state? and self.final?
       # These get wiped out by with_lock (which reloads the record),
       # so record them now in case we need to schedule a retry.
-      prev_secret_mounts = self.secret_mounts_was
-      prev_runtime_token = self.runtime_token_was
+      prev_secret_mounts = secret_mounts_before_last_save
+      prev_runtime_token = runtime_token_before_last_save
 
       # Need to take a lock on the container to ensure that any
       # concurrent container requests that might try to reuse this
@@ -659,12 +735,57 @@ class Container < ArvadosModel
       self.with_lock do
         act_as_system_user do
           if self.state == Cancelled
-            retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
+            # Cancelled means the container didn't run to completion.
+            # This happens either because it was cancelled by the user
+            # or because there was an infrastructure failure.  We want
+            # to retry infrastructure failures automatically.
+            #
+            # Seach for live container requests to determine if we
+            # should retry the container.
+            retryable_requests = ContainerRequest.
+                                   joins('left outer join containers as requesting_container on container_requests.requesting_container_uuid = requesting_container.uuid').
+                                   where("container_requests.container_uuid = ? and "+
+                                         "container_requests.priority > 0 and "+
+                                         "container_requests.owner_uuid not in (select group_uuid from trashed_groups) "+
+                                         "(requesting_container.priority is null or (requesting_container.state = 'Running' and requesting_container.priority > 0)) and "+
+                                         "container_requests.state = 'Committed' and "+
+                                         "container_requests.container_count < container_requests.container_count_max", uuid).
+                                   order('container_requests.uuid asc')
           else
             retryable_requests = []
           end
 
           if retryable_requests.any?
+            scheduling_parameters = {
+              # partitions: empty if any are empty, else the union of all parameters
+              "partitions": retryable_requests
+                              .map { |req| req.scheduling_parameters["partitions"] || [] }
+                              .reduce { |cur, new| (cur.empty? or new.empty?) ? [] : (cur | new) },
+
+              # preemptible: true if all are true, else false
+              "preemptible": retryable_requests
+                               .map { |req| req.scheduling_parameters["preemptible"] }
+                               .all?,
+
+              # supervisor: true if all any true, else false
+              "supervisor": retryable_requests
+                               .map { |req| req.scheduling_parameters["supervisor"] }
+                               .any?,
+
+              # max_run_time: 0 if any are 0 (unlimited), else the maximum
+              "max_run_time": retryable_requests
+                                .map { |req| req.scheduling_parameters["max_run_time"] || 0 }
+                                .reduce do |cur, new|
+                if cur == 0 or new == 0
+                  0
+                elsif new > cur
+                  new
+                else
+                  cur
+                end
+              end,
+            }
+
             c_attrs = {
               command: self.command,
               cwd: self.cwd,
@@ -673,7 +794,7 @@ class Container < ArvadosModel
               container_image: self.container_image,
               mounts: self.mounts,
               runtime_constraints: self.runtime_constraints,
-              scheduling_parameters: self.scheduling_parameters,
+              scheduling_parameters: scheduling_parameters,
               secret_mounts: prev_secret_mounts,
               runtime_token: prev_runtime_token,
               runtime_user_uuid: self.runtime_user_uuid,
@@ -684,6 +805,7 @@ class Container < ArvadosModel
               cr.with_lock do
                 leave_modified_by_user_alone do
                   # Use row locking because this increments container_count
+                  cr.cumulative_cost += self.cost + self.subrequests_cost
                   cr.container_uuid = c.uuid
                   cr.save!
                 end
@@ -701,13 +823,13 @@ class Container < ArvadosModel
 
           # Cancel outstanding container requests made by this container.
           ContainerRequest.
-            includes(:container).
             where(requesting_container_uuid: uuid,
-                  state: ContainerRequest::Committed).each do |cr|
+                  state: ContainerRequest::Committed).
+            in_batches(of: 15).each_record do |cr|
             leave_modified_by_user_alone do
-              cr.update_attributes!(priority: 0)
-              cr.container.reload
-              if cr.container.state == Container::Queued || cr.container.state == Container::Locked
+              cr.set_priority_zero
+              container_state = Container.where(uuid: cr.container_uuid).pluck(:state).first
+              if container_state == Container::Queued || container_state == Container::Locked
                 # If the child container hasn't started yet, finalize the
                 # child CR now instead of leaving it "on hold", i.e.,
                 # Queued with priority 0.  (OTOH, if the child is already