X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cae94f22b760c6c6899fc4d23db15d389535ff0a..ff8d14acd42952a21f5428e96d86e4a54c41be9a:/services/api/app/models/container.rb diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb index c1c3eae94b..9420ef3cb8 100644 --- a/services/api/app/models/container.rb +++ b/services/api/app/models/container.rb @@ -1,15 +1,18 @@ require 'whitelist_update' +require 'safe_json' class Container < ArvadosModel include HasUuid include KindAndEtag include CommonApiTemplate include WhitelistUpdate + extend CurrentApiClient serialize :environment, Hash serialize :mounts, Hash serialize :runtime_constraints, Hash serialize :command, Array + serialize :scheduling_parameters, Hash before_validation :fill_field_defaults, :if => :new_record? before_validation :set_timestamps @@ -17,6 +20,7 @@ class Container < ArvadosModel validate :validate_state_change validate :validate_change validate :validate_lock + validate :validate_output after_validation :assign_auth before_save :sort_serialized_attrs after_save :handle_completed @@ -42,6 +46,7 @@ class Container < ArvadosModel t.add :started_at t.add :state t.add :auth_uuid + t.add :scheduling_parameters end # Supported states for a container @@ -77,21 +82,117 @@ class Container < ArvadosModel end end + # Create a new container (or find an existing one) to satisfy the + # given container request. + def self.resolve(req) + c_attrs = { + command: req.command, + cwd: req.cwd, + environment: req.environment, + output_path: req.output_path, + container_image: resolve_container_image(req.container_image), + mounts: resolve_mounts(req.mounts), + runtime_constraints: resolve_runtime_constraints(req.runtime_constraints), + scheduling_parameters: req.scheduling_parameters, + } + act_as_system_user do + if req.use_existing && (reusable = find_reusable(c_attrs)) + reusable + else + Container.create!(c_attrs) + end + end + end + + # Return a runtime_constraints hash that complies with requested but + # is suitable for saving in a container record, i.e., has specific + # values instead of ranges. + # + # Doing this as a step separate from other resolutions, like "git + # revision range to commit hash", makes sense only when there is no + # opportunity to reuse an existing container (e.g., container reuse + # is not implemented yet, or we have already found that no existing + # containers are suitable). + def self.resolve_runtime_constraints(runtime_constraints) + rc = {} + defaults = { + 'keep_cache_ram' => + Rails.configuration.container_default_keep_cache_ram, + } + defaults.merge(runtime_constraints).each do |k, v| + if v.is_a? Array + rc[k] = v[0] + else + rc[k] = v + end + end + rc + end + + # Return a mounts hash suitable for a Container, i.e., with every + # readonly collection UUID resolved to a PDH. + def self.resolve_mounts(mounts) + c_mounts = {} + mounts.each do |k, mount| + mount = mount.dup + c_mounts[k] = mount + if mount['kind'] != 'collection' + next + end + if (uuid = mount.delete 'uuid') + c = Collection. + readable_by(current_user). + where(uuid: uuid). + select(:portable_data_hash). + first + if !c + raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found" + end + if mount['portable_data_hash'].nil? + # PDH not supplied by client + mount['portable_data_hash'] = c.portable_data_hash + elsif mount['portable_data_hash'] != c.portable_data_hash + # UUID and PDH supplied by client, but they don't agree + raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request" + end + end + end + return c_mounts + end + + # Return a container_image PDH suitable for a Container. + def self.resolve_container_image(container_image) + coll = Collection.for_latest_docker_image(container_image) + if !coll + raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found" + end + coll.portable_data_hash + end + def self.find_reusable(attrs) candidates = Container. - where('command = ?', attrs[:command].to_yaml). + where_serialized(:command, attrs[:command]). where('cwd = ?', attrs[:cwd]). - where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml). + where_serialized(:environment, attrs[:environment]). where('output_path = ?', attrs[:output_path]). - where('container_image = ?', attrs[:container_image]). - where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml). - where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml) - - # Check for Completed candidates that only had consistent outputs. - completed = candidates.where(state: Complete).where(exit_code: 0) - if completed.select("output").group('output').limit(2).length == 1 - return completed.order('finished_at asc').limit(1).first - end + where('container_image = ?', resolve_container_image(attrs[:container_image])). + where_serialized(:mounts, resolve_mounts(attrs[:mounts])). + where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints])) + + # Check for Completed candidates whose output and log are both readable. + select_readable_pdh = Collection. + readable_by(current_user). + select(:portable_data_hash). + to_sql + usable = candidates. + where(state: Complete). + where(exit_code: 0). + where("log IN (#{select_readable_pdh})"). + where("output IN (#{select_readable_pdh})"). + order('finished_at ASC'). + limit(1). + first + return usable if usable # Check for Running candidates and return the most likely to finish sooner. running = candidates.where(state: Running). @@ -141,6 +242,10 @@ class Container < ArvadosModel uuids: uuid_list) end + def final? + [Complete, Cancelled].include?(self.state) + end + protected def fill_field_defaults @@ -150,6 +255,7 @@ class Container < ArvadosModel self.mounts ||= {} self.cwd ||= "." self.priority ||= 1 + self.scheduling_parameters ||= {} end def permission_to_create @@ -157,7 +263,23 @@ class Container < ArvadosModel end def permission_to_update - current_user.andand.is_admin + # Override base permission check to allow auth_uuid to set progress and + # output (only). Whether it is legal to set progress and output in the current + # state has already been checked in validate_change. + current_user.andand.is_admin || + (!current_api_client_authorization.nil? and + [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid) + end + + def ensure_owner_uuid_is_permitted + # Override base permission check to allow auth_uuid to set progress and + # output (only). Whether it is legal to set progress and output in the current + # state has already been checked in validate_change. + if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid + check_update_whitelist [:progress, :output] + else + super + end end def set_timestamps @@ -176,7 +298,7 @@ class Container < ArvadosModel if self.new_record? permitted.push(:owner_uuid, :command, :container_image, :cwd, :environment, :mounts, :output_path, :priority, - :runtime_constraints) + :runtime_constraints, :scheduling_parameters) end case self.state @@ -184,7 +306,7 @@ class Container < ArvadosModel permitted.push :priority when Running - permitted.push :priority, :progress + permitted.push :priority, :progress, :output if self.state_changed? permitted.push :started_at end @@ -211,20 +333,10 @@ class Container < ArvadosModel end def validate_lock - # If the Container is already locked by someone other than the - # current api_client_auth, disallow all changes -- except - # priority, which needs to change to reflect max(priority) of - # relevant ContainerRequests. - if locked_by_uuid_was - if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid - check_update_whitelist [:priority] - end - end - if [Locked, Running].include? self.state # If the Container was already locked, locked_by_uuid must not # changes. Otherwise, the current auth gets the lock. - need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid + need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid else need_lock = nil end @@ -240,6 +352,23 @@ class Container < ArvadosModel self.locked_by_uuid = need_lock end + def validate_output + # Output must exist and be readable by the current user. This is so + # that a container cannot "claim" a collection that it doesn't otherwise + # have access to just by setting the output field to the collection PDH. + if output_changed? + c = Collection.unscoped do + Collection. + readable_by(current_user). + where(portable_data_hash: self.output). + first + end + if !c + errors.add :output, "collection must exist and be readable by current user." + end + end + end + def assign_auth if self.auth_uuid_changed? return errors.add :auth_uuid, 'is readonly' @@ -275,22 +404,53 @@ class Container < ArvadosModel if self.runtime_constraints_changed? self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints) end + if self.scheduling_parameters_changed? + self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters) + end end def handle_completed # This container is finished so finalize any associated container requests # that are associated with this container. - if self.state_changed? and [Complete, Cancelled].include? self.state + if self.state_changed? and self.final? act_as_system_user do + + if self.state == Cancelled + retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid) + else + retryable_requests = [] + end + + if retryable_requests.any? + c_attrs = { + command: self.command, + cwd: self.cwd, + environment: self.environment, + output_path: self.output_path, + container_image: self.container_image, + mounts: self.mounts, + runtime_constraints: self.runtime_constraints, + scheduling_parameters: self.scheduling_parameters + } + c = Container.create! c_attrs + retryable_requests.each do |cr| + cr.with_lock do + # Use row locking because this increments container_count + cr.container_uuid = c.uuid + cr.save + end + end + end + # Notify container requests associated with this container ContainerRequest.where(container_uuid: uuid, - :state => ContainerRequest::Committed).each do |cr| - cr.container_completed! + state: ContainerRequest::Committed).each do |cr| + cr.finalize! end # Try to cancel any outstanding container requests made by this container. ContainerRequest.where(requesting_container_uuid: uuid, - :state => ContainerRequest::Committed).each do |cr| + state: ContainerRequest::Committed).each do |cr| cr.priority = 0 cr.save end