require 'log_reuse_info'
require 'whitelist_update'
require 'safe_json'
+require 'update_priorities'
class Container < ArvadosModel
include ArvadosModelUpdates
serialize :runtime_constraints, Hash
serialize :command, Array
serialize :scheduling_parameters, Hash
+ serialize :output_glob, Array
after_find :fill_container_defaults_after_find
before_validation :fill_field_defaults, :if => :new_record?
before_save :clear_runtime_status_when_queued
after_save :update_cr_logs
after_save :handle_completed
- after_save :propagate_priority
- has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
- belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
+ has_many :container_requests,
+ class_name: 'ContainerRequest',
+ foreign_key: 'container_uuid',
+ primary_key: 'uuid'
+ belongs_to :auth,
+ class_name: 'ApiClientAuthorization',
+ foreign_key: 'auth_uuid',
+ primary_key: 'uuid',
+ optional: true
api_accessible :user, extend: :common do |t|
t.add :command
t.add :mounts
t.add :output
t.add :output_path
+ t.add :output_glob
t.add :priority
t.add :progress
t.add :runtime_constraints
# priority of a user-submitted request is a function of
# user-assigned priority and request creation time.
def update_priority!
- return if ![Queued, Locked, Running].include?(state)
- p = ContainerRequest.
- where('container_uuid=? and priority>0', uuid).
- select("priority, requesting_container_uuid, created_at").
- lock(true).
- map do |cr|
- if cr.requesting_container_uuid
- Container.where(uuid: cr.requesting_container_uuid).pluck(:priority).first
- else
- (cr.priority << 50) - (cr.created_at.to_time.to_f * 1000).to_i
- end
- end.max || 0
- update_attributes!(priority: p)
- end
-
- def propagate_priority
- return true unless saved_change_to_priority?
- act_as_system_user do
- # Update the priority of child container requests to match new
- # priority of the parent container (ignoring requests with no
- # container assigned, because their priority doesn't matter).
- ContainerRequest.
- where('requesting_container_uuid = ? and state = ? and container_uuid is not null',
- self.uuid, ContainerRequest::Committed).
- pluck(:container_uuid).each do |container_uuid|
- Container.find_by_uuid(container_uuid).update_priority!
- end
- end
+ update_priorities uuid
+ reload
end
# Create a new container (or find an existing one) to satisfy the
cwd: req.cwd,
environment: req.environment,
output_path: req.output_path,
+ output_glob: req.output_glob,
container_image: resolve_container_image(req.container_image),
mounts: resolve_mounts(req.mounts),
runtime_constraints: resolve_runtime_constraints(req.runtime_constraints),
candidates = candidates.where('output_path = ?', attrs[:output_path])
log_reuse_info(candidates) { "after filtering on output_path #{attrs[:output_path].inspect}" }
+ candidates = candidates.where_serialized(:output_glob, attrs[:output_glob], md5: true)
+ log_reuse_info(candidates) { "after filtering on output_glob #{attrs[:output_glob].inspect}" }
+
image = resolve_container_image(attrs[:container_image])
candidates = candidates.where('container_image = ?', image)
log_reuse_info(candidates) { "after filtering on container_image #{image.inspect} (resolved from #{attrs[:container_image].inspect})" }
resolved_runtime_constraints.delete('cuda')
].uniq
end
- reusable_runtime_constraints = hash_product(runtime_constraint_variations)
+ reusable_runtime_constraints = hash_product(**runtime_constraint_variations)
.map { |v| resolved_runtime_constraints.merge(v) }
candidates = candidates.where_serialized(:runtime_constraints, reusable_runtime_constraints, md5: true, multivalue: true)
# Check for non-failing Running candidates and return the most likely to finish sooner.
log_reuse_info { "checking for state=Running..." }
running = candidates.where(state: Running).
- where("(runtime_status->'error') is null").
+ where("(runtime_status->'error') is null and priority > 0").
order('progress desc, started_at asc').
limit(1).first
if running
locked_or_queued = candidates.
where("state IN (?)", [Locked, Queued]).
order('state asc, priority desc, created_at asc').
- limit(1).first
- if locked_or_queued
- log_reuse_info { "done, reusing container #{locked_or_queued.uuid} with state=#{locked_or_queued.state}" }
- return locked_or_queued
+ limit(1)
+ if !attrs[:scheduling_parameters]['preemptible']
+ locked_or_queued = locked_or_queued.
+ where("not ((scheduling_parameters::jsonb)->>'preemptible')::boolean")
+ end
+ chosen = locked_or_queued.first
+ if chosen
+ log_reuse_info { "done, reusing container #{chosen.uuid} with state=#{chosen.state}" }
+ return chosen
else
log_reuse_info { "have no containers in Locked or Queued state" }
end
if self.state != Queued
raise LockFailedError.new("cannot lock when #{self.state}")
end
- self.update_attributes!(state: Locked)
+ self.update!(state: Locked)
end
end
if self.state != Locked
raise InvalidStateTransitionError.new("cannot unlock when #{self.state}")
end
- self.update_attributes!(state: Queued)
+ self.update!(state: Queued)
end
end
self.environment ||= {}
self.runtime_constraints ||= {}
self.mounts ||= {}
+ self.output_glob ||= []
self.cwd ||= "."
self.priority ||= 0
self.scheduling_parameters ||= {}
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
- :environment, :mounts, :output_path, :priority,
- :runtime_constraints, :scheduling_parameters,
- :secret_mounts, :runtime_token,
- :runtime_user_uuid, :runtime_auth_scopes,
- :output_storage_classes)
+ :environment, :mounts, :output_path, :output_glob,
+ :priority, :runtime_constraints,
+ :scheduling_parameters, :secret_mounts,
+ :runtime_token, :runtime_user_uuid,
+ :runtime_auth_scopes, :output_storage_classes)
end
case self.state
# each requesting CR.
return if self.final? || !saved_change_to_log?
leave_modified_by_user_alone do
- ContainerRequest.where(container_uuid: self.uuid).each do |cr|
+ ContainerRequest.where(container_uuid: self.uuid, state: ContainerRequest::Committed).each do |cr|
cr.update_collections(container: self, collections: ['log'])
cr.save!
end
# ensure the token doesn't validate later in the same
# transaction (e.g., in a test case) by satisfying expires_at >
# transaction timestamp.
- self.auth.andand.update_attributes(expires_at: db_transaction_time)
+ self.auth.andand.update(expires_at: db_transaction_time)
self.auth = nil
return
elsif self.auth
self.with_lock do
act_as_system_user do
if self.state == Cancelled
- retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
+ # Cancelled means the container didn't run to completion.
+ # This happens either because it was cancelled by the user
+ # or because there was an infrastructure failure. We want
+ # to retry infrastructure failures automatically.
+ #
+ # Seach for live container requests to determine if we
+ # should retry the container.
+ retryable_requests = ContainerRequest.
+ joins('left outer join containers as requesting_container on container_requests.requesting_container_uuid = requesting_container.uuid').
+ where("container_requests.container_uuid = ? and "+
+ "container_requests.priority > 0 and "+
+ "container_requests.owner_uuid not in (select group_uuid from trashed_groups) and "+
+ "(requesting_container.priority is null or (requesting_container.state = 'Running' and requesting_container.priority > 0)) and "+
+ "container_requests.state = 'Committed' and "+
+ "container_requests.container_count < container_requests.container_count_max", uuid).
+ order('container_requests.uuid asc')
else
retryable_requests = []
end
cwd: self.cwd,
environment: self.environment,
output_path: self.output_path,
+ output_glob: self.output_glob,
container_image: self.container_image,
mounts: self.mounts,
runtime_constraints: self.runtime_constraints,
ContainerRequest.
where(requesting_container_uuid: uuid,
state: ContainerRequest::Committed).
- find_in_batches(batch_size: 10) do |batch|
- batch.each do |cr|
- leave_modified_by_user_alone do
- cr.set_priority_zero
- container_state = Container.where(uuid: cr.container_uuid).pluck(:state).first
- if container_state == Container::Queued || container_state == Container::Locked
- # If the child container hasn't started yet, finalize the
- # child CR now instead of leaving it "on hold", i.e.,
- # Queued with priority 0. (OTOH, if the child is already
- # running, leave it alone so it can get cancelled the
- # usual way, get a copy of the log collection, etc.)
- cr.update_attributes!(state: ContainerRequest::Final)
- end
+ in_batches(of: 15).each_record do |cr|
+ leave_modified_by_user_alone do
+ cr.set_priority_zero
+ container_state = Container.where(uuid: cr.container_uuid).pluck(:state).first
+ if container_state == Container::Queued || container_state == Container::Locked
+ # If the child container hasn't started yet, finalize the
+ # child CR now instead of leaving it "on hold", i.e.,
+ # Queued with priority 0. (OTOH, if the child is already
+ # running, leave it alone so it can get cancelled the
+ # usual way, get a copy of the log collection, etc.)
+ cr.update!(state: ContainerRequest::Final)
end
end
end