require 'log_reuse_info'
require 'whitelist_update'
require 'safe_json'
-require 'update_priority'
+require 'update_priorities'
class Container < ArvadosModel
include ArvadosModelUpdates
before_save :clear_runtime_status_when_queued
after_save :update_cr_logs
after_save :handle_completed
- after_save :propagate_priority
- after_commit { UpdatePriority.run_update_thread }
- has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
- belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
+ has_many :container_requests,
+ class_name: 'ContainerRequest',
+ foreign_key: 'container_uuid',
+ primary_key: 'uuid'
+ belongs_to :auth,
+ class_name: 'ApiClientAuthorization',
+ foreign_key: 'auth_uuid',
+ primary_key: 'uuid',
+ optional: true
api_accessible :user, extend: :common do |t|
t.add :command
t.add :interactive_session_started
t.add :output_storage_classes
t.add :output_properties
+ t.add :cost
+ t.add :subrequests_cost
end
# Supported states for a container
# priority of a user-submitted request is a function of
# user-assigned priority and request creation time.
def update_priority!
- return if ![Queued, Locked, Running].include?(state)
- p = ContainerRequest.
- where('container_uuid=? and priority>0', uuid).
- includes(:requesting_container).
- lock(true).
- map do |cr|
- if cr.requesting_container
- cr.requesting_container.priority
- else
- (cr.priority << 50) - (cr.created_at.to_time.to_f * 1000).to_i
- end
- end.max || 0
- update_attributes!(priority: p)
- end
-
- def propagate_priority
- return true unless saved_change_to_priority?
- act_as_system_user do
- # Update the priority of child container requests to match new
- # priority of the parent container (ignoring requests with no
- # container assigned, because their priority doesn't matter).
- ContainerRequest.
- where(requesting_container_uuid: self.uuid,
- state: ContainerRequest::Committed).
- where('container_uuid is not null').
- includes(:container).
- map(&:container).
- map(&:update_priority!)
- end
+ update_priorities uuid
+ reload
end
# Create a new container (or find an existing one) to satisfy the
if rc['keep_cache_ram'] == 0
rc['keep_cache_ram'] = Rails.configuration.Containers.DefaultKeepCacheRAM
end
+ if rc['keep_cache_disk'] == 0 and rc['keep_cache_ram'] == 0
+ rc['keep_cache_disk'] = bound_keep_cache_disk(rc['ram'])
+ end
rc
end
candidates = candidates.where('secret_mounts_md5 = ?', secret_mounts_md5)
log_reuse_info(candidates) { "after filtering on secret_mounts_md5 #{secret_mounts_md5.inspect}" }
- if attrs[:runtime_constraints]['cuda'].nil?
- attrs[:runtime_constraints]['cuda'] = {
- 'device_count' => 0,
- 'driver_version' => '',
- 'hardware_capability' => '',
- }
- end
- resolved_runtime_constraints = [resolve_runtime_constraints(attrs[:runtime_constraints])]
- if resolved_runtime_constraints[0]['cuda']['device_count'] == 0
- # If no CUDA requested, extend search to include older container
- # records that don't have a 'cuda' section in runtime_constraints
- resolved_runtime_constraints << resolved_runtime_constraints[0].except('cuda')
- end
-
- candidates = candidates.where_serialized(:runtime_constraints, resolved_runtime_constraints, md5: true, multivalue: true)
+ resolved_runtime_constraints = resolve_runtime_constraints(attrs[:runtime_constraints])
+ # Ideally we would completely ignore Keep cache constraints when making
+ # reuse considerations, but our database structure makes that impractical.
+ # The best we can do is generate a search that matches on all likely values.
+ runtime_constraint_variations = {
+ keep_cache_disk: [
+ # Check for constraints without keep_cache_disk
+ # (containers that predate the constraint)
+ nil,
+ # Containers that use keep_cache_ram instead
+ 0,
+ # The default value
+ bound_keep_cache_disk(resolved_runtime_constraints['ram']),
+ # The minimum default bound
+ bound_keep_cache_disk(0),
+ # The maximum default bound (presumably)
+ bound_keep_cache_disk(1 << 60),
+ # The requested value
+ resolved_runtime_constraints.delete('keep_cache_disk'),
+ ].uniq,
+ keep_cache_ram: [
+ # Containers that use keep_cache_disk instead
+ 0,
+ # The default value
+ Rails.configuration.Containers.DefaultKeepCacheRAM,
+ # The requested value
+ resolved_runtime_constraints.delete('keep_cache_ram'),
+ ].uniq,
+ }
+ resolved_cuda = resolved_runtime_constraints['cuda']
+ if resolved_cuda.nil? or resolved_cuda['device_count'] == 0
+ runtime_constraint_variations[:cuda] = [
+ # Check for constraints without cuda
+ # (containers that predate the constraint)
+ nil,
+ # The default "don't need CUDA" value
+ {
+ 'device_count' => 0,
+ 'driver_version' => '',
+ 'hardware_capability' => '',
+ },
+ # The requested value
+ resolved_runtime_constraints.delete('cuda')
+ ].uniq
+ end
+ reusable_runtime_constraints = hash_product(**runtime_constraint_variations)
+ .map { |v| resolved_runtime_constraints.merge(v) }
+
+ candidates = candidates.where_serialized(:runtime_constraints, reusable_runtime_constraints, md5: true, multivalue: true)
log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
log_reuse_info { "checking for state=Complete with readable output and log..." }
# Check for non-failing Running candidates and return the most likely to finish sooner.
log_reuse_info { "checking for state=Running..." }
running = candidates.where(state: Running).
- where("(runtime_status->'error') is null").
+ where("(runtime_status->'error') is null and priority > 0").
order('progress desc, started_at asc').
limit(1).first
if running
locked_or_queued = candidates.
where("state IN (?)", [Locked, Queued]).
order('state asc, priority desc, created_at asc').
- limit(1).first
- if locked_or_queued
- log_reuse_info { "done, reusing container #{locked_or_queued.uuid} with state=#{locked_or_queued.state}" }
- return locked_or_queued
+ limit(1)
+ if !attrs[:scheduling_parameters]['preemptible']
+ locked_or_queued = locked_or_queued.
+ where("not ((scheduling_parameters::jsonb)->>'preemptible')::boolean")
+ end
+ chosen = locked_or_queued.first
+ if chosen
+ log_reuse_info { "done, reusing container #{chosen.uuid} with state=#{chosen.state}" }
+ return chosen
else
log_reuse_info { "have no containers in Locked or Queued state" }
end
if self.state != Queued
raise LockFailedError.new("cannot lock when #{self.state}")
end
- self.update_attributes!(state: Locked)
+ self.update!(state: Locked)
end
end
if self.state != Locked
raise InvalidStateTransitionError.new("cannot unlock when #{self.state}")
end
- self.update_attributes!(state: Queued)
+ self.update!(state: Queued)
end
end
protected
+ def self.bound_keep_cache_disk(value)
+ value ||= 0
+ min_value = 2 << 30
+ max_value = 32 << 30
+ if value < min_value
+ min_value
+ elsif value > max_value
+ max_value
+ else
+ value
+ end
+ end
+
+ def self.hash_product(**kwargs)
+ # kwargs is a hash that maps parameters to an array of values.
+ # This function enumerates every possible hash where each key has one of
+ # the values from its array.
+ # The output keys are strings since that's what container hash attributes
+ # want.
+ # A nil value yields a hash without that key.
+ [[:_, nil]].product(
+ *kwargs.map { |(key, values)| [key.to_s].product(values) },
+ ).map { |param_pairs| Hash[param_pairs].compact }
+ end
+
def fill_field_defaults
self.state ||= Queued
self.environment ||= {}
def validate_change
permitted = [:state]
- progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties]
- final_attrs = [:exit_code, :finished_at]
+ final_attrs = [:finished_at]
+ progress_attrs = [:progress, :runtime_status, :subrequests_cost, :cost,
+ :log, :output, :output_properties, :exit_code]
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
permitted.push :priority
when Running
- permitted.push :priority, :output_properties, *progress_attrs
+ permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
if self.state_changed?
- permitted.push :started_at, :gateway_address
+ permitted.push :started_at
end
if !self.interactive_session_started_was
permitted.push :interactive_session_started
when Running
permitted.push :finished_at, *progress_attrs
when Queued, Locked
- permitted.push :finished_at, :log, :runtime_status
+ permitted.push :finished_at, :log, :runtime_status, :cost
end
else
# each requesting CR.
return if self.final? || !saved_change_to_log?
leave_modified_by_user_alone do
- ContainerRequest.where(container_uuid: self.uuid).each do |cr|
+ ContainerRequest.where(container_uuid: self.uuid, state: ContainerRequest::Committed).each do |cr|
cr.update_collections(container: self, collections: ['log'])
cr.save!
end
# ensure the token doesn't validate later in the same
# transaction (e.g., in a test case) by satisfying expires_at >
# transaction timestamp.
- self.auth.andand.update_attributes(expires_at: db_transaction_time)
+ self.auth.andand.update(expires_at: db_transaction_time)
self.auth = nil
return
elsif self.auth
self.with_lock do
act_as_system_user do
if self.state == Cancelled
- retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
+ # Cancelled means the container didn't run to completion.
+ # This happens either because it was cancelled by the user
+ # or because there was an infrastructure failure. We want
+ # to retry infrastructure failures automatically.
+ #
+ # Seach for live container requests to determine if we
+ # should retry the container.
+ retryable_requests = ContainerRequest.
+ joins('left outer join containers as requesting_container on container_requests.requesting_container_uuid = requesting_container.uuid').
+ where("container_requests.container_uuid = ? and "+
+ "container_requests.priority > 0 and "+
+ "container_requests.owner_uuid not in (select group_uuid from trashed_groups) and "+
+ "(requesting_container.priority is null or (requesting_container.state = 'Running' and requesting_container.priority > 0)) and "+
+ "container_requests.state = 'Committed' and "+
+ "container_requests.container_count < container_requests.container_count_max", uuid).
+ order('container_requests.uuid asc')
else
retryable_requests = []
end
if retryable_requests.any?
+ scheduling_parameters = {
+ # partitions: empty if any are empty, else the union of all parameters
+ "partitions": retryable_requests
+ .map { |req| req.scheduling_parameters["partitions"] || [] }
+ .reduce { |cur, new| (cur.empty? or new.empty?) ? [] : (cur | new) },
+
+ # preemptible: true if all are true, else false
+ "preemptible": retryable_requests
+ .map { |req| req.scheduling_parameters["preemptible"] }
+ .all?,
+
+ # supervisor: true if all any true, else false
+ "supervisor": retryable_requests
+ .map { |req| req.scheduling_parameters["supervisor"] }
+ .any?,
+
+ # max_run_time: 0 if any are 0 (unlimited), else the maximum
+ "max_run_time": retryable_requests
+ .map { |req| req.scheduling_parameters["max_run_time"] || 0 }
+ .reduce do |cur, new|
+ if cur == 0 or new == 0
+ 0
+ elsif new > cur
+ new
+ else
+ cur
+ end
+ end,
+ }
+
c_attrs = {
command: self.command,
cwd: self.cwd,
container_image: self.container_image,
mounts: self.mounts,
runtime_constraints: self.runtime_constraints,
- scheduling_parameters: self.scheduling_parameters,
+ scheduling_parameters: scheduling_parameters,
secret_mounts: prev_secret_mounts,
runtime_token: prev_runtime_token,
runtime_user_uuid: self.runtime_user_uuid,
cr.with_lock do
leave_modified_by_user_alone do
# Use row locking because this increments container_count
+ cr.cumulative_cost += self.cost + self.subrequests_cost
cr.container_uuid = c.uuid
cr.save!
end
# Cancel outstanding container requests made by this container.
ContainerRequest.
- includes(:container).
where(requesting_container_uuid: uuid,
- state: ContainerRequest::Committed).each do |cr|
+ state: ContainerRequest::Committed).
+ in_batches(of: 15).each_record do |cr|
leave_modified_by_user_alone do
- cr.update_attributes!(priority: 0)
- cr.container.reload
- if cr.container.state == Container::Queued || cr.container.state == Container::Locked
+ cr.set_priority_zero
+ container_state = Container.where(uuid: cr.container_uuid).pluck(:state).first
+ if container_state == Container::Queued || container_state == Container::Locked
# If the child container hasn't started yet, finalize the
# child CR now instead of leaving it "on hold", i.e.,
# Queued with priority 0. (OTOH, if the child is already
# running, leave it alone so it can get cancelled the
# usual way, get a copy of the log collection, etc.)
- cr.update_attributes!(state: ContainerRequest::Final)
+ cr.update!(state: ContainerRequest::Final)
end
end
end