t.add :state
t.add :auth_uuid
t.add :scheduling_parameters
+ t.add :runtime_user_uuid
+ t.add :runtime_auth_scopes
end
# Supported states for a container
if mount['kind'] != 'collection'
next
end
- if (uuid = mount.delete 'uuid')
+
+ uuid = mount.delete 'uuid'
+
+ if mount['portable_data_hash'].nil? and !uuid.nil?
+ # PDH not supplied, try by UUID
c = Collection.
readable_by(current_user).
where(uuid: uuid).
if !c
raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
end
- if mount['portable_data_hash'].nil?
- # PDH not supplied by client
- mount['portable_data_hash'] = c.portable_data_hash
- elsif mount['portable_data_hash'] != c.portable_data_hash
- # UUID and PDH supplied by client, but they don't agree
- raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
- end
+ mount['portable_data_hash'] = c.portable_data_hash
end
end
return c_mounts
candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]), md5: true)
log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
- candidates = candidates.where('runtime_user_uuid = ? or (runtime_user_uuid is NULL and runtime_auth_scopes is NULL)',
- attrs[:runtime_user_uuid])
- log_reuse_info(candidates) { "after filtering on runtime_user_uuid #{attrs[:runtime_user_uuid].inspect}" }
-
- candidates = candidates.where('runtime_auth_scopes = ? or (runtime_user_uuid is NULL and runtime_auth_scopes is NULL)',
- SafeJSON.dump(attrs[:runtime_auth_scopes]))
- log_reuse_info(candidates) { "after filtering on runtime_auth_scopes #{attrs[:runtime_auth_scopes].inspect}" }
-
log_reuse_info { "checking for state=Complete with readable output and log..." }
select_readable_pdh = Collection.
transaction do
reload
check_lock_fail
- update_attributes!(state: Locked)
+ update_attributes!(state: Locked, lock_count: self.lock_count+1)
end
end
transaction do
reload(lock: 'FOR UPDATE')
check_unlock_fail
- update_attributes!(state: Queued)
+ if self.lock_count < Rails.configuration.max_container_dispatch_attempts
+ update_attributes!(state: Queued)
+ else
+ update_attributes!(state: Cancelled,
+ runtime_status: {
+ error: "Container exceeded 'max_container_dispatch_attempts' (lock_count=#{self.lock_count}."
+ })
+ end
end
end
else
kwargs = {}
end
+ if users_list.select { |u| u.is_admin }.any?
+ return super
+ end
Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
end
end
def self.for_current_token
+ return if !current_api_client_authorization
_, _, _, container_uuid = Thread.current[:token].split('/')
if container_uuid.nil?
- Container.where(auth_uuid: current_api_client_authorization.uuid)
+ Container.where(auth_uuid: current_api_client_authorization.uuid).first
else
Container.where('auth_uuid=? or (uuid=? and runtime_token=?)',
current_api_client_authorization.uuid,
container_uuid,
- current_api_client_authorization.token)
+ current_api_client_authorization.token).first
end
end
+ # NOTE: Migration 20190322174136_add_file_info_to_collection.rb relies on this function.
+ #
+ # Change with caution!
+ #
+ # Correctly groups pdhs to use for batch database updates. Helps avoid
+ # updating too many database rows in a single transaction.
+ def self.group_pdhs_for_multiple_transactions(log_prefix)
+ batch_size_max = 1 << 28 # 256 MiB
+ last_pdh = '0'
+ done = 0
+ any = true
+
+ total = ActiveRecord::Base.connection.exec_query(
+ 'SELECT DISTINCT portable_data_hash FROM collections'
+ ).rows.count
+
+ while any
+ any = false
+ pdhs = ActiveRecord::Base.connection.exec_query(
+ 'SELECT DISTINCT portable_data_hash FROM collections '\
+ "WHERE portable_data_hash > '#{last_pdh}' "\
+ 'GROUP BY portable_data_hash LIMIT 1000'
+ )
+ break if pdhs.rows.count.zero?
+
+ Container.group_pdhs_by_manifest_size(pdhs.rows, batch_size_max) do |grouped_pdhs|
+ any = true
+ yield grouped_pdhs
+ done += grouped_pdhs.size
+ last_pdh = pdhs[-1]
+ Rails.logger.info(log_prefix + ": #{done}/#{total}")
+ end
+ end
+ Rails.logger.info(log_prefix + ': finished')
+ end
+
+ # NOTE: Migration 20190322174136_add_file_info_to_collection.rb relies on this function.
+ #
+ # Change with caution!
+ #
+ # Given an array of pdhs, yield a subset array of pdhs when the total
+ # size of all manifest_texts is no more than batch_size_max. Pdhs whose manifest_text
+ # is bigger than batch_size_max are yielded by themselves
+ def self.group_pdhs_by_manifest_size(pdhs, batch_size_max)
+ batch_size = 0
+ batch_pdhs = {}
+ pdhs.each do |pdh|
+ manifest_size = pdh.split('+')[1].to_i
+ if batch_size > 0 && batch_size + manifest_size > batch_size_max
+ yield batch_pdhs.keys
+ batch_pdhs = {}
+ batch_size = 0
+ end
+ batch_pdhs[pdh] = true
+ batch_size += manifest_size
+ end
+ yield batch_pdhs.keys
+ end
+
protected
def fill_field_defaults
case self.state
when Locked
- permitted.push :priority, :runtime_status, :log
+ permitted.push :priority, :runtime_status, :log, :lock_count
when Queued
permitted.push :priority
when Running
permitted.push :finished_at, *progress_attrs
when Queued, Locked
- permitted.push :finished_at, :log
+ permitted.push :finished_at, :log, :runtime_status
end
else
return false
end
- if current_api_client_authorization.andand.uuid.andand == self.auth_uuid
- # The contained process itself can update progress indicators,
- # but can't change priority etc.
- permitted = permitted & (progress_attrs + final_attrs + [:state] - [:log])
+ if self.state == Running &&
+ !current_api_client_authorization.nil? &&
+ (current_api_client_authorization.uuid == self.auth_uuid ||
+ current_api_client_authorization.token == self.runtime_token)
+ # The contained process itself can write final attrs but can't
+ # change priority or log.
+ permitted.push *final_attrs
+ permitted = permitted - [:log, :priority]
elsif self.locked_by_uuid && self.locked_by_uuid != current_api_client_authorization.andand.uuid
# When locked, progress fields cannot be updated by the wrong
# dispatcher, even though it has admin privileges.
if self.scheduling_parameters_changed?
self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
end
+ if self.runtime_auth_scopes_changed?
+ self.runtime_auth_scopes = self.runtime_auth_scopes.sort
+ end
end
def update_secret_mounts_md5
container_image: self.container_image,
mounts: self.mounts,
runtime_constraints: self.runtime_constraints,
- scheduling_parameters: self.scheduling_parameters
+ scheduling_parameters: self.scheduling_parameters,
+ secret_mounts: self.secret_mounts_was,
+ runtime_token: self.runtime_token_was,
+ runtime_user_uuid: self.runtime_user_uuid,
+ runtime_auth_scopes: self.runtime_auth_scopes
}
c = Container.create! c_attrs
retryable_requests.each do |cr|