Merge branch '21666-provision-test-improvement'
[arvados.git] / services / api / app / models / container.rb
index 21e8fcf508c4ee0951d4f43abe2c71702e40e62f..08dad2314e2c5383b204d60d070c39882ce657a8 100644 (file)
@@ -5,7 +5,7 @@
 require 'log_reuse_info'
 require 'whitelist_update'
 require 'safe_json'
-require 'update_priority'
+require 'update_priorities'
 
 class Container < ArvadosModel
   include ArvadosModelUpdates
@@ -30,6 +30,7 @@ class Container < ArvadosModel
   serialize :runtime_constraints, Hash
   serialize :command, Array
   serialize :scheduling_parameters, Hash
+  serialize :output_glob, Array
 
   after_find :fill_container_defaults_after_find
   before_validation :fill_field_defaults, :if => :new_record?
@@ -50,11 +51,16 @@ class Container < ArvadosModel
   before_save :clear_runtime_status_when_queued
   after_save :update_cr_logs
   after_save :handle_completed
-  after_save :propagate_priority
-  after_commit { UpdatePriority.run_update_thread }
 
-  has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
-  belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
+  has_many :container_requests,
+           class_name: 'ContainerRequest',
+           foreign_key: 'container_uuid',
+           primary_key: 'uuid'
+  belongs_to :auth,
+             class_name: 'ApiClientAuthorization',
+             foreign_key: 'auth_uuid',
+             primary_key: 'uuid',
+             optional: true
 
   api_accessible :user, extend: :common do |t|
     t.add :command
@@ -68,6 +74,7 @@ class Container < ArvadosModel
     t.add :mounts
     t.add :output
     t.add :output_path
+    t.add :output_glob
     t.add :priority
     t.add :progress
     t.add :runtime_constraints
@@ -131,35 +138,8 @@ class Container < ArvadosModel
   # priority of a user-submitted request is a function of
   # user-assigned priority and request creation time.
   def update_priority!
-    return if ![Queued, Locked, Running].include?(state)
-    p = ContainerRequest.
-        where('container_uuid=? and priority>0', uuid).
-        includes(:requesting_container).
-        lock(true).
-        map do |cr|
-      if cr.requesting_container
-        cr.requesting_container.priority
-      else
-        (cr.priority << 50) - (cr.created_at.to_time.to_f * 1000).to_i
-      end
-    end.max || 0
-    update_attributes!(priority: p)
-  end
-
-  def propagate_priority
-    return true unless saved_change_to_priority?
-    act_as_system_user do
-      # Update the priority of child container requests to match new
-      # priority of the parent container (ignoring requests with no
-      # container assigned, because their priority doesn't matter).
-      ContainerRequest.
-        where(requesting_container_uuid: self.uuid,
-              state: ContainerRequest::Committed).
-        where('container_uuid is not null').
-        includes(:container).
-        map(&:container).
-        map(&:update_priority!)
-    end
+    update_priorities uuid
+    reload
   end
 
   # Create a new container (or find an existing one) to satisfy the
@@ -186,6 +166,7 @@ class Container < ArvadosModel
         cwd: req.cwd,
         environment: req.environment,
         output_path: req.output_path,
+        output_glob: req.output_glob,
         container_image: resolve_container_image(req.container_image),
         mounts: resolve_mounts(req.mounts),
         runtime_constraints: resolve_runtime_constraints(req.runtime_constraints),
@@ -228,10 +209,7 @@ class Container < ArvadosModel
       rc['keep_cache_ram'] = Rails.configuration.Containers.DefaultKeepCacheRAM
     end
     if rc['keep_cache_disk'] == 0 and rc['keep_cache_ram'] == 0
-      # If neither ram nor disk cache was specified and
-      # DefaultKeepCacheRAM==0, default to disk cache with size equal
-      # to RAM constraint (but at least 2 GiB and at most 32 GiB).
-      rc['keep_cache_disk'] = [[rc['ram'] || 0, 2 << 30].max, 32 << 30].min
+      rc['keep_cache_disk'] = bound_keep_cache_disk(rc['ram'])
     end
     rc
   end
@@ -288,6 +266,9 @@ class Container < ArvadosModel
     candidates = candidates.where('output_path = ?', attrs[:output_path])
     log_reuse_info(candidates) { "after filtering on output_path #{attrs[:output_path].inspect}" }
 
+    candidates = candidates.where_serialized(:output_glob, attrs[:output_glob], md5: true)
+    log_reuse_info(candidates) { "after filtering on output_glob #{attrs[:output_glob].inspect}" }
+
     image = resolve_container_image(attrs[:container_image])
     candidates = candidates.where('container_image = ?', image)
     log_reuse_info(candidates) { "after filtering on container_image #{image.inspect} (resolved from #{attrs[:container_image].inspect})" }
@@ -299,30 +280,55 @@ class Container < ArvadosModel
     candidates = candidates.where('secret_mounts_md5 = ?', secret_mounts_md5)
     log_reuse_info(candidates) { "after filtering on secret_mounts_md5 #{secret_mounts_md5.inspect}" }
 
-    if attrs[:runtime_constraints]['cuda'].nil?
-      attrs[:runtime_constraints]['cuda'] = {
-        'device_count' => 0,
-        'driver_version' => '',
-        'hardware_capability' => '',
-      }
-    end
-    resolved_runtime_constraints = [resolve_runtime_constraints(attrs[:runtime_constraints])]
-    if resolved_runtime_constraints[0]['cuda']['device_count'] == 0
-      # If no CUDA requested, extend search to include older container
-      # records that don't have a 'cuda' section in runtime_constraints
-      resolved_runtime_constraints << resolved_runtime_constraints[0].except('cuda')
-    end
-    if resolved_runtime_constraints[0]['keep_cache_disk'] == 0
-      # If no disk cache requested, extend search to include older container
-      # records that don't have a 'keep_cache_disk' field in runtime_constraints
-      if resolved_runtime_constraints.length == 2
-        # exclude the one that also excludes CUDA
-        resolved_runtime_constraints << resolved_runtime_constraints[1].except('keep_cache_disk')
-      end
-      resolved_runtime_constraints << resolved_runtime_constraints[0].except('keep_cache_disk')
-    end
-
-    candidates = candidates.where_serialized(:runtime_constraints, resolved_runtime_constraints, md5: true, multivalue: true)
+    resolved_runtime_constraints = resolve_runtime_constraints(attrs[:runtime_constraints])
+    # Ideally we would completely ignore Keep cache constraints when making
+    # reuse considerations, but our database structure makes that impractical.
+    # The best we can do is generate a search that matches on all likely values.
+    runtime_constraint_variations = {
+      keep_cache_disk: [
+        # Check for constraints without keep_cache_disk
+        # (containers that predate the constraint)
+        nil,
+        # Containers that use keep_cache_ram instead
+        0,
+        # The default value
+        bound_keep_cache_disk(resolved_runtime_constraints['ram']),
+        # The minimum default bound
+        bound_keep_cache_disk(0),
+        # The maximum default bound (presumably)
+        bound_keep_cache_disk(1 << 60),
+        # The requested value
+        resolved_runtime_constraints.delete('keep_cache_disk'),
+      ].uniq,
+      keep_cache_ram: [
+        # Containers that use keep_cache_disk instead
+        0,
+        # The default value
+        Rails.configuration.Containers.DefaultKeepCacheRAM,
+        # The requested value
+        resolved_runtime_constraints.delete('keep_cache_ram'),
+      ].uniq,
+    }
+    resolved_cuda = resolved_runtime_constraints['cuda']
+    if resolved_cuda.nil? or resolved_cuda['device_count'] == 0
+      runtime_constraint_variations[:cuda] = [
+        # Check for constraints without cuda
+        # (containers that predate the constraint)
+        nil,
+        # The default "don't need CUDA" value
+        {
+          'device_count' => 0,
+          'driver_version' => '',
+          'hardware_capability' => '',
+        },
+        # The requested value
+        resolved_runtime_constraints.delete('cuda')
+      ].uniq
+    end
+    reusable_runtime_constraints = hash_product(**runtime_constraint_variations)
+                                     .map { |v| resolved_runtime_constraints.merge(v) }
+
+    candidates = candidates.where_serialized(:runtime_constraints, reusable_runtime_constraints, md5: true, multivalue: true)
     log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
 
     log_reuse_info { "checking for state=Complete with readable output and log..." }
@@ -350,7 +356,7 @@ class Container < ArvadosModel
     # Check for non-failing Running candidates and return the most likely to finish sooner.
     log_reuse_info { "checking for state=Running..." }
     running = candidates.where(state: Running).
-              where("(runtime_status->'error') is null").
+              where("(runtime_status->'error') is null and priority > 0").
               order('progress desc, started_at asc').
               limit(1).first
     if running
@@ -364,10 +370,15 @@ class Container < ArvadosModel
     locked_or_queued = candidates.
                        where("state IN (?)", [Locked, Queued]).
                        order('state asc, priority desc, created_at asc').
-                       limit(1).first
-    if locked_or_queued
-      log_reuse_info { "done, reusing container #{locked_or_queued.uuid} with state=#{locked_or_queued.state}" }
-      return locked_or_queued
+                       limit(1)
+    if !attrs[:scheduling_parameters]['preemptible']
+      locked_or_queued = locked_or_queued.
+                           where("not ((scheduling_parameters::jsonb)->>'preemptible')::boolean")
+    end
+    chosen = locked_or_queued.first
+    if chosen
+      log_reuse_info { "done, reusing container #{chosen.uuid} with state=#{chosen.state}" }
+      return chosen
     else
       log_reuse_info { "have no containers in Locked or Queued state" }
     end
@@ -381,7 +392,7 @@ class Container < ArvadosModel
       if self.state != Queued
         raise LockFailedError.new("cannot lock when #{self.state}")
       end
-      self.update_attributes!(state: Locked)
+      self.update!(state: Locked)
     end
   end
 
@@ -399,7 +410,7 @@ class Container < ArvadosModel
       if self.state != Locked
         raise InvalidStateTransitionError.new("cannot unlock when #{self.state}")
       end
-      self.update_attributes!(state: Queued)
+      self.update!(state: Queued)
     end
   end
 
@@ -447,11 +458,37 @@ class Container < ArvadosModel
 
   protected
 
+  def self.bound_keep_cache_disk(value)
+    value ||= 0
+    min_value = 2 << 30
+    max_value = 32 << 30
+    if value < min_value
+      min_value
+    elsif value > max_value
+      max_value
+    else
+      value
+    end
+  end
+
+  def self.hash_product(**kwargs)
+    # kwargs is a hash that maps parameters to an array of values.
+    # This function enumerates every possible hash where each key has one of
+    # the values from its array.
+    # The output keys are strings since that's what container hash attributes
+    # want.
+    # A nil value yields a hash without that key.
+    [[:_, nil]].product(
+      *kwargs.map { |(key, values)| [key.to_s].product(values) },
+    ).map { |param_pairs| Hash[param_pairs].compact }
+  end
+
   def fill_field_defaults
     self.state ||= Queued
     self.environment ||= {}
     self.runtime_constraints ||= {}
     self.mounts ||= {}
+    self.output_glob ||= []
     self.cwd ||= "."
     self.priority ||= 0
     self.scheduling_parameters ||= {}
@@ -501,11 +538,11 @@ class Container < ArvadosModel
 
     if self.new_record?
       permitted.push(:owner_uuid, :command, :container_image, :cwd,
-                     :environment, :mounts, :output_path, :priority,
-                     :runtime_constraints, :scheduling_parameters,
-                     :secret_mounts, :runtime_token,
-                     :runtime_user_uuid, :runtime_auth_scopes,
-                     :output_storage_classes)
+                     :environment, :mounts, :output_path, :output_glob,
+                     :priority, :runtime_constraints,
+                     :scheduling_parameters, :secret_mounts,
+                     :runtime_token, :runtime_user_uuid,
+                     :runtime_auth_scopes, :output_storage_classes)
     end
 
     case self.state
@@ -601,7 +638,7 @@ class Container < ArvadosModel
     # each requesting CR.
     return if self.final? || !saved_change_to_log?
     leave_modified_by_user_alone do
-      ContainerRequest.where(container_uuid: self.uuid).each do |cr|
+      ContainerRequest.where(container_uuid: self.uuid, state: ContainerRequest::Committed).each do |cr|
         cr.update_collections(container: self, collections: ['log'])
         cr.save!
       end
@@ -619,7 +656,7 @@ class Container < ArvadosModel
       # ensure the token doesn't validate later in the same
       # transaction (e.g., in a test case) by satisfying expires_at >
       # transaction timestamp.
-      self.auth.andand.update_attributes(expires_at: db_transaction_time)
+      self.auth.andand.update(expires_at: db_transaction_time)
       self.auth = nil
       return
     elsif self.auth
@@ -712,21 +749,67 @@ class Container < ArvadosModel
       self.with_lock do
         act_as_system_user do
           if self.state == Cancelled
-            retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
+            # Cancelled means the container didn't run to completion.
+            # This happens either because it was cancelled by the user
+            # or because there was an infrastructure failure.  We want
+            # to retry infrastructure failures automatically.
+            #
+            # Seach for live container requests to determine if we
+            # should retry the container.
+            retryable_requests = ContainerRequest.
+                                   joins('left outer join containers as requesting_container on container_requests.requesting_container_uuid = requesting_container.uuid').
+                                   where("container_requests.container_uuid = ? and "+
+                                         "container_requests.priority > 0 and "+
+                                         "container_requests.owner_uuid not in (select group_uuid from trashed_groups) and "+
+                                         "(requesting_container.priority is null or (requesting_container.state = 'Running' and requesting_container.priority > 0)) and "+
+                                         "container_requests.state = 'Committed' and "+
+                                         "container_requests.container_count < container_requests.container_count_max", uuid).
+                                   order('container_requests.uuid asc')
           else
             retryable_requests = []
           end
 
           if retryable_requests.any?
+            scheduling_parameters = {
+              # partitions: empty if any are empty, else the union of all parameters
+              "partitions": retryable_requests
+                              .map { |req| req.scheduling_parameters["partitions"] || [] }
+                              .reduce { |cur, new| (cur.empty? or new.empty?) ? [] : (cur | new) },
+
+              # preemptible: true if all are true, else false
+              "preemptible": retryable_requests
+                               .map { |req| req.scheduling_parameters["preemptible"] }
+                               .all?,
+
+              # supervisor: true if all any true, else false
+              "supervisor": retryable_requests
+                               .map { |req| req.scheduling_parameters["supervisor"] }
+                               .any?,
+
+              # max_run_time: 0 if any are 0 (unlimited), else the maximum
+              "max_run_time": retryable_requests
+                                .map { |req| req.scheduling_parameters["max_run_time"] || 0 }
+                                .reduce do |cur, new|
+                if cur == 0 or new == 0
+                  0
+                elsif new > cur
+                  new
+                else
+                  cur
+                end
+              end,
+            }
+
             c_attrs = {
               command: self.command,
               cwd: self.cwd,
               environment: self.environment,
               output_path: self.output_path,
+              output_glob: self.output_glob,
               container_image: self.container_image,
               mounts: self.mounts,
               runtime_constraints: self.runtime_constraints,
-              scheduling_parameters: self.scheduling_parameters,
+              scheduling_parameters: scheduling_parameters,
               secret_mounts: prev_secret_mounts,
               runtime_token: prev_runtime_token,
               runtime_user_uuid: self.runtime_user_uuid,
@@ -755,19 +838,19 @@ class Container < ArvadosModel
 
           # Cancel outstanding container requests made by this container.
           ContainerRequest.
-            includes(:container).
             where(requesting_container_uuid: uuid,
-                  state: ContainerRequest::Committed).each do |cr|
+                  state: ContainerRequest::Committed).
+            in_batches(of: 15).each_record do |cr|
             leave_modified_by_user_alone do
-              cr.update_attributes!(priority: 0)
-              cr.container.reload
-              if cr.container.state == Container::Queued || cr.container.state == Container::Locked
+              cr.set_priority_zero
+              container_state = Container.where(uuid: cr.container_uuid).pluck(:state).first
+              if container_state == Container::Queued || container_state == Container::Locked
                 # If the child container hasn't started yet, finalize the
                 # child CR now instead of leaving it "on hold", i.e.,
                 # Queued with priority 0.  (OTOH, if the child is already
                 # running, leave it alone so it can get cancelled the
                 # usual way, get a copy of the log collection, etc.)
-                cr.update_attributes!(state: ContainerRequest::Final)
+                cr.update!(state: ContainerRequest::Final)
               end
             end
           end