require 'log_reuse_info'
require 'whitelist_update'
require 'safe_json'
-require 'update_priority'
+require 'update_priorities'
class Container < ArvadosModel
include ArvadosModelUpdates
extend DbCurrentTime
extend LogReuseInfo
+ # Posgresql JSONB columns should NOT be declared as serialized, Rails 5
+ # already know how to properly treat them.
+ attribute :secret_mounts, :jsonbHash, default: {}
+ attribute :runtime_status, :jsonbHash, default: {}
+ attribute :runtime_auth_scopes, :jsonbArray, default: []
+ attribute :output_storage_classes, :jsonbArray, default: lambda { Rails.configuration.DefaultStorageClasses }
+ attribute :output_properties, :jsonbHash, default: {}
+
serialize :environment, Hash
serialize :mounts, Hash
serialize :runtime_constraints, Hash
serialize :command, Array
serialize :scheduling_parameters, Hash
- serialize :secret_mounts, Hash
- serialize :runtime_status, Hash
+ after_find :fill_container_defaults_after_find
before_validation :fill_field_defaults, :if => :new_record?
before_validation :set_timestamps
+ before_validation :check_lock
+ before_validation :check_unlock
validates :command, :container_image, :output_path, :cwd, :priority, { presence: true }
validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validate :validate_runtime_status
before_save :clear_runtime_status_when_queued
after_save :update_cr_logs
after_save :handle_completed
- after_save :propagate_priority
- after_commit { UpdatePriority.run_update_thread }
has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
t.add :scheduling_parameters
t.add :runtime_user_uuid
t.add :runtime_auth_scopes
+ t.add :lock_count
+ t.add :gateway_address
+ t.add :interactive_session_started
+ t.add :output_storage_classes
+ t.add :output_properties
+ t.add :cost
+ t.add :subrequests_cost
end
# Supported states for a container
nil => [Queued],
Queued => [Locked, Cancelled],
Locked => [Queued, Running, Cancelled],
- Running => [Complete, Cancelled]
+ Running => [Complete, Cancelled],
+ Complete => [Cancelled]
}
def self.limit_index_columns_read
end
def self.full_text_searchable_columns
- super - ["secret_mounts", "secret_mounts_md5", "runtime_token"]
+ super - ["secret_mounts", "secret_mounts_md5", "runtime_token", "gateway_address", "output_storage_classes"]
end
def self.searchable_columns *args
- super - ["secret_mounts_md5", "runtime_token"]
+ super - ["secret_mounts_md5", "runtime_token", "gateway_address", "output_storage_classes"]
end
def logged_attributes
# priority of a user-submitted request is a function of
# user-assigned priority and request creation time.
def update_priority!
- return if ![Queued, Locked, Running].include?(state)
- p = ContainerRequest.
- where('container_uuid=? and priority>0', uuid).
- includes(:requesting_container).
- lock(true).
- map do |cr|
- if cr.requesting_container
- cr.requesting_container.priority
- else
- (cr.priority << 50) - (cr.created_at.to_time.to_f * 1000).to_i
- end
- end.max || 0
- update_attributes!(priority: p)
- end
-
- def propagate_priority
- return true unless priority_changed?
- act_as_system_user do
- # Update the priority of child container requests to match new
- # priority of the parent container (ignoring requests with no
- # container assigned, because their priority doesn't matter).
- ContainerRequest.
- where(requesting_container_uuid: self.uuid,
- state: ContainerRequest::Committed).
- where('container_uuid is not null').
- includes(:container).
- map(&:container).
- map(&:update_priority!)
- end
+ update_priorities uuid
+ reload
end
# Create a new container (or find an existing one) to satisfy the
secret_mounts: req.secret_mounts,
runtime_token: req.runtime_token,
runtime_user_uuid: runtime_user.uuid,
- runtime_auth_scopes: runtime_auth_scopes
+ runtime_auth_scopes: runtime_auth_scopes,
+ output_storage_classes: req.output_storage_classes,
}
end
act_as_system_user do
# containers are suitable).
def self.resolve_runtime_constraints(runtime_constraints)
rc = {}
- defaults = {
- 'keep_cache_ram' =>
- Rails.configuration.container_default_keep_cache_ram,
- }
- defaults.merge(runtime_constraints).each do |k, v|
+ runtime_constraints.each do |k, v|
if v.is_a? Array
rc[k] = v[0]
else
rc[k] = v
end
end
+ if rc['keep_cache_ram'] == 0
+ rc['keep_cache_ram'] = Rails.configuration.Containers.DefaultKeepCacheRAM
+ end
+ if rc['keep_cache_disk'] == 0 and rc['keep_cache_ram'] == 0
+ rc['keep_cache_disk'] = bound_keep_cache_disk(rc['ram'])
+ end
rc
end
candidates = candidates.where('secret_mounts_md5 = ?', secret_mounts_md5)
log_reuse_info(candidates) { "after filtering on secret_mounts_md5 #{secret_mounts_md5.inspect}" }
- candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]), md5: true)
+ resolved_runtime_constraints = resolve_runtime_constraints(attrs[:runtime_constraints])
+ # Ideally we would completely ignore Keep cache constraints when making
+ # reuse considerations, but our database structure makes that impractical.
+ # The best we can do is generate a search that matches on all likely values.
+ runtime_constraint_variations = {
+ keep_cache_disk: [
+ # Check for constraints without keep_cache_disk
+ # (containers that predate the constraint)
+ nil,
+ # Containers that use keep_cache_ram instead
+ 0,
+ # The default value
+ bound_keep_cache_disk(resolved_runtime_constraints['ram']),
+ # The minimum default bound
+ bound_keep_cache_disk(0),
+ # The maximum default bound (presumably)
+ bound_keep_cache_disk(1 << 60),
+ # The requested value
+ resolved_runtime_constraints.delete('keep_cache_disk'),
+ ].uniq,
+ keep_cache_ram: [
+ # Containers that use keep_cache_disk instead
+ 0,
+ # The default value
+ Rails.configuration.Containers.DefaultKeepCacheRAM,
+ # The requested value
+ resolved_runtime_constraints.delete('keep_cache_ram'),
+ ].uniq,
+ }
+ resolved_cuda = resolved_runtime_constraints['cuda']
+ if resolved_cuda.nil? or resolved_cuda['device_count'] == 0
+ runtime_constraint_variations[:cuda] = [
+ # Check for constraints without cuda
+ # (containers that predate the constraint)
+ nil,
+ # The default "don't need CUDA" value
+ {
+ 'device_count' => 0,
+ 'driver_version' => '',
+ 'hardware_capability' => '',
+ },
+ # The requested value
+ resolved_runtime_constraints.delete('cuda')
+ ].uniq
+ end
+ reusable_runtime_constraints = hash_product(runtime_constraint_variations)
+ .map { |v| resolved_runtime_constraints.merge(v) }
+
+ candidates = candidates.where_serialized(:runtime_constraints, reusable_runtime_constraints, md5: true, multivalue: true)
log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
log_reuse_info { "checking for state=Complete with readable output and log..." }
nil
end
- def check_lock_fail
- if self.state != Queued
- raise LockFailedError.new("cannot lock when #{self.state}")
- elsif self.priority <= 0
- raise LockFailedError.new("cannot lock when priority<=0")
+ def lock
+ self.with_lock do
+ if self.state != Queued
+ raise LockFailedError.new("cannot lock when #{self.state}")
+ end
+ self.update_attributes!(state: Locked)
end
end
- def lock
- # Check invalid state transitions once before getting the lock
- # (because it's cheaper that way) and once after getting the lock
- # (because state might have changed while acquiring the lock).
- check_lock_fail
- transaction do
- reload
- check_lock_fail
- update_attributes!(state: Locked, lock_count: self.lock_count+1)
+ def check_lock
+ if state_was == Queued and state == Locked
+ if self.priority <= 0
+ raise LockFailedError.new("cannot lock when priority<=0")
+ end
+ self.lock_count = self.lock_count+1
end
end
- def check_unlock_fail
- if self.state != Locked
- raise InvalidStateTransitionError.new("cannot unlock when #{self.state}")
- elsif self.locked_by_uuid != current_api_client_authorization.uuid
- raise InvalidStateTransitionError.new("locked by a different token")
+ def unlock
+ self.with_lock do
+ if self.state != Locked
+ raise InvalidStateTransitionError.new("cannot unlock when #{self.state}")
+ end
+ self.update_attributes!(state: Queued)
end
end
- def unlock
- # Check invalid state transitions twice (see lock)
- check_unlock_fail
- transaction do
- reload(lock: 'FOR UPDATE')
- check_unlock_fail
- if self.lock_count < Rails.configuration.max_container_dispatch_attempts
- update_attributes!(state: Queued)
- else
- update_attributes!(state: Cancelled,
- runtime_status: {
- error: "Container exceeded 'max_container_dispatch_attempts' (lock_count=#{self.lock_count}."
- })
+ def check_unlock
+ if state_was == Locked and state == Queued
+ if self.locked_by_uuid != current_api_client_authorization.uuid
+ raise ArvadosModel::PermissionDeniedError.new("locked by a different token")
+ end
+ if self.lock_count >= Rails.configuration.Containers.MaxDispatchAttempts
+ self.state = Cancelled
+ self.runtime_status = {error: "Failed to start container. Cancelled after exceeding 'Containers.MaxDispatchAttempts' (lock_count=#{self.lock_count})"}
end
end
end
if users_list.select { |u| u.is_admin }.any?
return super
end
- Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
+ Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").arel.exists)
end
def final?
end
end
- # NOTE: Migration 20190322174136_add_file_info_to_collection.rb relies on this function.
- #
- # Change with caution!
- #
- # Correctly groups pdhs to use for batch database updates. Helps avoid
- # updating too many database rows in a single transaction.
- def self.group_pdhs_for_multiple_transactions(log_prefix)
- batch_size_max = 1 << 28 # 256 MiB
- last_pdh = '0'
- done = 0
- any = true
-
- total = ActiveRecord::Base.connection.exec_query(
- 'SELECT DISTINCT portable_data_hash FROM collections'
- ).rows.count
-
- while any
- any = false
- pdhs_res = ActiveRecord::Base.connection.exec_query(
- 'SELECT DISTINCT portable_data_hash FROM collections '\
- "WHERE portable_data_hash > '#{last_pdh}' "\
- 'GROUP BY portable_data_hash LIMIT 1000'
- )
- break if pdhs_res.rows.count.zero?
-
- pdhs = pdhs_res.rows.collect { |r| r[0] }
- Container.group_pdhs_by_manifest_size(pdhs, batch_size_max) do |grouped_pdhs|
- any = true
- yield grouped_pdhs
- done += grouped_pdhs.size
- last_pdh = pdhs[-1]
- Rails.logger.info(log_prefix + ": #{done}/#{total}")
- end
- end
- Rails.logger.info(log_prefix + ': finished')
- end
+ protected
- # NOTE: Migration 20190322174136_add_file_info_to_collection.rb relies on this function.
- #
- # Change with caution!
- #
- # Given an array of pdhs, yield a subset array of pdhs when the total
- # size of all manifest_texts is no more than batch_size_max. Pdhs whose manifest_text
- # is bigger than batch_size_max are yielded by themselves
- def self.group_pdhs_by_manifest_size(pdhs, batch_size_max)
- batch_size = 0
- batch_pdhs = {}
- pdhs.each do |pdh|
- manifest_size = pdh.split('+')[1].to_i
- if batch_size > 0 && batch_size + manifest_size > batch_size_max
- yield batch_pdhs.keys
- batch_pdhs = {}
- batch_size = 0
- end
- batch_pdhs[pdh] = true
- batch_size += manifest_size
+ def self.bound_keep_cache_disk(value)
+ value ||= 0
+ min_value = 2 << 30
+ max_value = 32 << 30
+ if value < min_value
+ min_value
+ elsif value > max_value
+ max_value
+ else
+ value
end
- yield batch_pdhs.keys
end
- protected
+ def self.hash_product(**kwargs)
+ # kwargs is a hash that maps parameters to an array of values.
+ # This function enumerates every possible hash where each key has one of
+ # the values from its array.
+ # The output keys are strings since that's what container hash attributes
+ # want.
+ # A nil value yields a hash without that key.
+ [[:_, nil]].product(
+ *kwargs.map { |(key, values)| [key.to_s].product(values) },
+ ).map { |param_pairs| Hash[param_pairs].compact }
+ end
def fill_field_defaults
self.state ||= Queued
current_user.andand.is_admin
end
+ def permission_to_destroy
+ current_user.andand.is_admin
+ end
+
def ensure_owner_uuid_is_permitted
# validate_change ensures owner_uuid can't be changed at all --
# except during create, which requires admin privileges. Checking
def validate_change
permitted = [:state]
- progress_attrs = [:progress, :runtime_status, :log, :output]
- final_attrs = [:exit_code, :finished_at]
+ final_attrs = [:finished_at]
+ progress_attrs = [:progress, :runtime_status, :subrequests_cost, :cost,
+ :log, :output, :output_properties, :exit_code]
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
:environment, :mounts, :output_path, :priority,
:runtime_constraints, :scheduling_parameters,
:secret_mounts, :runtime_token,
- :runtime_user_uuid, :runtime_auth_scopes)
+ :runtime_user_uuid, :runtime_auth_scopes,
+ :output_storage_classes)
end
case self.state
permitted.push :priority
when Running
- permitted.push :priority, *progress_attrs
+ permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
if self.state_changed?
permitted.push :started_at
end
+ if !self.interactive_session_started_was
+ permitted.push :interactive_session_started
+ end
when Complete
if self.state_was == Running
when Running
permitted.push :finished_at, *progress_attrs
when Queued, Locked
- permitted.push :finished_at, :log, :runtime_status
+ permitted.push :finished_at, :log, :runtime_status, :cost
end
else
return false
end
- if self.state == Running &&
+ if self.state_was == Running &&
!current_api_client_authorization.nil? &&
(current_api_client_authorization.uuid == self.auth_uuid ||
current_api_client_authorization.token == self.runtime_token)
# change priority or log.
permitted.push *final_attrs
permitted = permitted - [:log, :priority]
+ elsif !current_user.andand.is_admin
+ raise PermissionDeniedError
elsif self.locked_by_uuid && self.locked_by_uuid != current_api_client_authorization.andand.uuid
# When locked, progress fields cannot be updated by the wrong
# dispatcher, even though it has admin privileges.
# If self.final?, this update is superfluous: the final log/output
# update will be done when handle_completed calls finalize! on
# each requesting CR.
- return if self.final? || !self.log_changed?
+ return if self.final? || !saved_change_to_log?
leave_modified_by_user_alone do
- ContainerRequest.where(container_uuid: self.uuid).each do |cr|
+ ContainerRequest.where(container_uuid: self.uuid, state: ContainerRequest::Committed).each do |cr|
cr.update_collections(container: self, collections: ['log'])
cr.save!
end
return errors.add :auth_uuid, 'is readonly'
end
if not [Locked, Running].include? self.state
- # don't need one
- self.auth.andand.update_attributes(expires_at: db_current_time)
+ # Don't need one. If auth already exists, expire it.
+ #
+ # We use db_transaction_time here (not db_current_time) to
+ # ensure the token doesn't validate later in the same
+ # transaction (e.g., in a test case) by satisfying expires_at >
+ # transaction timestamp.
+ self.auth.andand.update_attributes(expires_at: db_transaction_time)
self.auth = nil
return
elsif self.auth
self.runtime_auth_scopes = ["all"]
end
- # generate a new token
+ # Generate a new token. This runs with admin credentials as it's done by a
+ # dispatcher user, so expires_at isn't enforced by API.MaxTokenLifetime.
self.auth = ApiClientAuthorization.
create!(user_id: User.find_by_uuid(self.runtime_user_uuid).id,
api_client_id: 0,
def handle_completed
# This container is finished so finalize any associated container requests
# that are associated with this container.
- if self.state_changed? and self.final?
- act_as_system_user do
-
- if self.state == Cancelled
- retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
- else
- retryable_requests = []
- end
+ if saved_change_to_state? and self.final?
+ # These get wiped out by with_lock (which reloads the record),
+ # so record them now in case we need to schedule a retry.
+ prev_secret_mounts = secret_mounts_before_last_save
+ prev_runtime_token = runtime_token_before_last_save
+
+ # Need to take a lock on the container to ensure that any
+ # concurrent container requests that might try to reuse this
+ # container will block until the container completion
+ # transaction finishes. This ensure that concurrent container
+ # requests that try to reuse this container are finalized (on
+ # Complete) or don't reuse it (on Cancelled).
+ self.with_lock do
+ act_as_system_user do
+ if self.state == Cancelled
+ retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
+ else
+ retryable_requests = []
+ end
- if retryable_requests.any?
- c_attrs = {
- command: self.command,
- cwd: self.cwd,
- environment: self.environment,
- output_path: self.output_path,
- container_image: self.container_image,
- mounts: self.mounts,
- runtime_constraints: self.runtime_constraints,
- scheduling_parameters: self.scheduling_parameters,
- secret_mounts: self.secret_mounts_was,
- runtime_token: self.runtime_token_was,
- runtime_user_uuid: self.runtime_user_uuid,
- runtime_auth_scopes: self.runtime_auth_scopes
- }
- c = Container.create! c_attrs
- retryable_requests.each do |cr|
- cr.with_lock do
- leave_modified_by_user_alone do
- # Use row locking because this increments container_count
- cr.container_uuid = c.uuid
- cr.save!
+ if retryable_requests.any?
+ scheduling_parameters = {
+ # partitions: empty if any are empty, else the union of all parameters
+ "partitions": retryable_requests
+ .map { |req| req.scheduling_parameters["partitions"] || [] }
+ .reduce { |cur, new| (cur.empty? or new.empty?) ? [] : (cur | new) },
+
+ # preemptible: true if all are true, else false
+ "preemptible": retryable_requests
+ .map { |req| req.scheduling_parameters["preemptible"] }
+ .all?,
+
+ # supervisor: true if all any true, else false
+ "supervisor": retryable_requests
+ .map { |req| req.scheduling_parameters["supervisor"] }
+ .any?,
+
+ # max_run_time: 0 if any are 0 (unlimited), else the maximum
+ "max_run_time": retryable_requests
+ .map { |req| req.scheduling_parameters["max_run_time"] || 0 }
+ .reduce do |cur, new|
+ if cur == 0 or new == 0
+ 0
+ elsif new > cur
+ new
+ else
+ cur
+ end
+ end,
+ }
+
+ c_attrs = {
+ command: self.command,
+ cwd: self.cwd,
+ environment: self.environment,
+ output_path: self.output_path,
+ container_image: self.container_image,
+ mounts: self.mounts,
+ runtime_constraints: self.runtime_constraints,
+ scheduling_parameters: scheduling_parameters,
+ secret_mounts: prev_secret_mounts,
+ runtime_token: prev_runtime_token,
+ runtime_user_uuid: self.runtime_user_uuid,
+ runtime_auth_scopes: self.runtime_auth_scopes
+ }
+ c = Container.create! c_attrs
+ retryable_requests.each do |cr|
+ cr.with_lock do
+ leave_modified_by_user_alone do
+ # Use row locking because this increments container_count
+ cr.cumulative_cost += self.cost + self.subrequests_cost
+ cr.container_uuid = c.uuid
+ cr.save!
+ end
end
end
end
- end
- # Notify container requests associated with this container
- ContainerRequest.where(container_uuid: uuid,
- state: ContainerRequest::Committed).each do |cr|
- leave_modified_by_user_alone do
- cr.finalize!
+ # Notify container requests associated with this container
+ ContainerRequest.where(container_uuid: uuid,
+ state: ContainerRequest::Committed).each do |cr|
+ leave_modified_by_user_alone do
+ cr.finalize!
+ end
end
- end
- # Cancel outstanding container requests made by this container.
- ContainerRequest.
- includes(:container).
- where(requesting_container_uuid: uuid,
- state: ContainerRequest::Committed).each do |cr|
- leave_modified_by_user_alone do
- cr.update_attributes!(priority: 0)
- cr.container.reload
- if cr.container.state == Container::Queued || cr.container.state == Container::Locked
- # If the child container hasn't started yet, finalize the
- # child CR now instead of leaving it "on hold", i.e.,
- # Queued with priority 0. (OTOH, if the child is already
- # running, leave it alone so it can get cancelled the
- # usual way, get a copy of the log collection, etc.)
- cr.update_attributes!(state: ContainerRequest::Final)
+ # Cancel outstanding container requests made by this container.
+ ContainerRequest.
+ where(requesting_container_uuid: uuid,
+ state: ContainerRequest::Committed).
+ in_batches(of: 15).each_record do |cr|
+ leave_modified_by_user_alone do
+ cr.set_priority_zero
+ container_state = Container.where(uuid: cr.container_uuid).pluck(:state).first
+ if container_state == Container::Queued || container_state == Container::Locked
+ # If the child container hasn't started yet, finalize the
+ # child CR now instead of leaving it "on hold", i.e.,
+ # Queued with priority 0. (OTOH, if the child is already
+ # running, leave it alone so it can get cancelled the
+ # usual way, get a copy of the log collection, etc.)
+ cr.update_attributes!(state: ContainerRequest::Final)
+ end
end
end
end