X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/82fa37ac01169178f6a9b1c142926de7b50e8841..2204a8d9305c85d2f7d65621a66443e7104c5f6b:/services/api/app/models/container.rb diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb index 52f1cba723..edcb8501a4 100644 --- a/services/api/app/models/container.rb +++ b/services/api/app/models/container.rb @@ -1,4 +1,10 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + +require 'log_reuse_info' require 'whitelist_update' +require 'safe_json' class Container < ArvadosModel include HasUuid @@ -6,6 +12,8 @@ class Container < ArvadosModel include CommonApiTemplate include WhitelistUpdate extend CurrentApiClient + extend DbCurrentTime + extend LogReuseInfo serialize :environment, Hash serialize :mounts, Hash @@ -16,6 +24,7 @@ class Container < ArvadosModel before_validation :fill_field_defaults, :if => :new_record? before_validation :set_timestamps validates :command, :container_image, :output_path, :cwd, :priority, :presence => true + validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 } validate :validate_state_change validate :validate_change validate :validate_lock @@ -23,6 +32,7 @@ class Container < ArvadosModel after_validation :assign_auth before_save :sort_serialized_attrs after_save :handle_completed + after_save :propagate_priority has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid @@ -65,6 +75,10 @@ class Container < ArvadosModel Running => [Complete, Cancelled] } + def self.limit_index_columns_read + ["mounts"] + end + def state_transitions State_transitions end @@ -81,92 +95,231 @@ class Container < ArvadosModel end end - def self.find_reusable(attrs) - candidates = Container. - where('command = ?', attrs[:command].to_yaml). - where('cwd = ?', attrs[:cwd]). - where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml). - where('output_path = ?', attrs[:output_path]). - where('container_image = ?', attrs[:container_image]). - where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml). - where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml) - - # Check for Completed candidates that had consistent outputs. - completed = candidates.where(state: Complete).where(exit_code: 0) - outputs = completed.select('output').group('output').limit(2) - if outputs.count.count != 1 - Rails.logger.debug("Found #{outputs.count.length} different outputs") - elsif Collection. - readable_by(current_user). - where(portable_data_hash: outputs.first.output). - count < 1 - Rails.logger.info("Found reusable container(s) " + - "but output #{outputs.first} is not readable " + - "by user #{current_user.uuid}") - else - # Return the oldest eligible container whose log is still - # present and readable by current_user. - readable_pdh = Collection. - readable_by(current_user). - select('portable_data_hash') - completed = completed. - where("log in (#{readable_pdh.to_sql})"). - order('finished_at asc'). - limit(1) - if completed.first - return completed.first + def propagate_priority + if self.priority_changed? + act_as_system_user do + # Update the priority of child container requests to match new priority + # of the parent container. + ContainerRequest.where(requesting_container_uuid: self.uuid, + state: ContainerRequest::Committed).each do |cr| + cr.priority = self.priority + cr.save + end + end + end + end + + # Create a new container (or find an existing one) to satisfy the + # given container request. + def self.resolve(req) + c_attrs = { + command: req.command, + cwd: req.cwd, + environment: req.environment, + output_path: req.output_path, + container_image: resolve_container_image(req.container_image), + mounts: resolve_mounts(req.mounts), + runtime_constraints: resolve_runtime_constraints(req.runtime_constraints), + scheduling_parameters: req.scheduling_parameters, + } + act_as_system_user do + if req.use_existing && (reusable = find_reusable(c_attrs)) + reusable else - Rails.logger.info("Found reusable container(s) but none with a log " + - "readable by user #{current_user.uuid}") + Container.create!(c_attrs) end end + end + + # Return a runtime_constraints hash that complies with requested but + # is suitable for saving in a container record, i.e., has specific + # values instead of ranges. + # + # Doing this as a step separate from other resolutions, like "git + # revision range to commit hash", makes sense only when there is no + # opportunity to reuse an existing container (e.g., container reuse + # is not implemented yet, or we have already found that no existing + # containers are suitable). + def self.resolve_runtime_constraints(runtime_constraints) + rc = {} + defaults = { + 'keep_cache_ram' => + Rails.configuration.container_default_keep_cache_ram, + } + defaults.merge(runtime_constraints).each do |k, v| + if v.is_a? Array + rc[k] = v[0] + else + rc[k] = v + end + end + rc + end + + # Return a mounts hash suitable for a Container, i.e., with every + # readonly collection UUID resolved to a PDH. + def self.resolve_mounts(mounts) + c_mounts = {} + mounts.each do |k, mount| + mount = mount.dup + c_mounts[k] = mount + if mount['kind'] != 'collection' + next + end + if (uuid = mount.delete 'uuid') + c = Collection. + readable_by(current_user). + where(uuid: uuid). + select(:portable_data_hash). + first + if !c + raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found" + end + if mount['portable_data_hash'].nil? + # PDH not supplied by client + mount['portable_data_hash'] = c.portable_data_hash + elsif mount['portable_data_hash'] != c.portable_data_hash + # UUID and PDH supplied by client, but they don't agree + raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request" + end + end + end + return c_mounts + end + + # Return a container_image PDH suitable for a Container. + def self.resolve_container_image(container_image) + coll = Collection.for_latest_docker_image(container_image) + if !coll + raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found" + end + coll.portable_data_hash + end + + def self.find_reusable(attrs) + log_reuse_info { "starting with #{Container.all.count} container records in database" } + candidates = Container.where_serialized(:command, attrs[:command]) + log_reuse_info(candidates) { "after filtering on command #{attrs[:command].inspect}" } + + candidates = candidates.where('cwd = ?', attrs[:cwd]) + log_reuse_info(candidates) { "after filtering on cwd #{attrs[:cwd].inspect}" } + + candidates = candidates.where_serialized(:environment, attrs[:environment]) + log_reuse_info(candidates) { "after filtering on environment #{attrs[:environment].inspect}" } + + candidates = candidates.where('output_path = ?', attrs[:output_path]) + log_reuse_info(candidates) { "after filtering on output_path #{attrs[:output_path].inspect}" } + + image = resolve_container_image(attrs[:container_image]) + candidates = candidates.where('container_image = ?', image) + log_reuse_info(candidates) { "after filtering on container_image #{image.inspect} (resolved from #{attrs[:container_image].inspect})" } + + candidates = candidates.where_serialized(:mounts, resolve_mounts(attrs[:mounts])) + log_reuse_info(candidates) { "after filtering on mounts #{attrs[:mounts].inspect}" } + + candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints])) + log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" } + + log_reuse_info { "checking for state=Complete with readable output and log..." } + + select_readable_pdh = Collection. + readable_by(current_user). + select(:portable_data_hash). + to_sql + + usable = candidates.where(state: Complete, exit_code: 0) + log_reuse_info(usable) { "with state=Complete, exit_code=0" } + + usable = usable.where("log IN (#{select_readable_pdh})") + log_reuse_info(usable) { "with readable log" } + + usable = usable.where("output IN (#{select_readable_pdh})") + log_reuse_info(usable) { "with readable output" } + + usable = usable.order('finished_at ASC').limit(1).first + if usable + log_reuse_info { "done, reusing container #{usable.uuid} with state=Complete" } + return usable + end # Check for Running candidates and return the most likely to finish sooner. + log_reuse_info { "checking for state=Running..." } running = candidates.where(state: Running). - order('progress desc, started_at asc').limit(1).first - return running if not running.nil? + order('progress desc, started_at asc'). + limit(1).first + if running + log_reuse_info { "done, reusing container #{running.uuid} with state=Running" } + return running + else + log_reuse_info { "have no containers in Running state" } + end # Check for Locked or Queued ones and return the most likely to start first. - locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]). - order('state asc, priority desc, created_at asc').limit(1).first - return locked_or_queued if not locked_or_queued.nil? + locked_or_queued = candidates. + where("state IN (?)", [Locked, Queued]). + order('state asc, priority desc, created_at asc'). + limit(1).first + if locked_or_queued + log_reuse_info { "done, reusing container #{locked_or_queued.uuid} with state=#{locked_or_queued.state}" } + return locked_or_queued + else + log_reuse_info { "have no containers in Locked or Queued state" } + end - # No suitable candidate found. + log_reuse_info { "done, no reusable container found" } nil end + def check_lock_fail + if self.state != Queued + raise LockFailedError.new("cannot lock when #{self.state}") + elsif self.priority <= 0 + raise LockFailedError.new("cannot lock when priority<=0") + end + end + def lock - with_lock do - if self.state == Locked - raise AlreadyLockedError + # Check invalid state transitions once before getting the lock + # (because it's cheaper that way) and once after getting the lock + # (because state might have changed while acquiring the lock). + check_lock_fail + transaction do + begin + reload(lock: 'FOR UPDATE NOWAIT') + rescue + raise LockFailedError.new("cannot lock: other transaction in progress") end - self.state = Locked - self.save! + check_lock_fail + update_attributes!(state: Locked) + end + end + + def check_unlock_fail + if self.state != Locked + raise InvalidStateTransitionError.new("cannot unlock when #{self.state}") + elsif self.locked_by_uuid != current_api_client_authorization.uuid + raise InvalidStateTransitionError.new("locked by a different token") end end def unlock - with_lock do - if self.state == Queued - raise InvalidStateTransitionError - end - self.state = Queued - self.save! + # Check invalid state transitions twice (see lock) + check_unlock_fail + transaction do + reload(lock: 'FOR UPDATE') + check_unlock_fail + update_attributes!(state: Queued) end end def self.readable_by(*users_list) - if users_list.select { |u| u.is_admin }.any? - return self + # Load optional keyword arguments, if they exist. + if users_list.last.is_a? Hash + kwargs = users_list.pop + else + kwargs = {} end - user_uuids = users_list.map { |u| u.uuid } - uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) } - uuid_list.uniq! - permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))" - joins(:container_requests). - where("container_requests.uuid IN #{permitted} OR "+ - "container_requests.owner_uuid IN (:uuids)", - uuids: uuid_list) + Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists) end def final? @@ -181,7 +334,7 @@ class Container < ArvadosModel self.runtime_constraints ||= {} self.mounts ||= {} self.cwd ||= "." - self.priority ||= 1 + self.priority ||= 0 self.scheduling_parameters ||= {} end @@ -248,7 +401,7 @@ class Container < ArvadosModel when Running permitted.push :finished_at, :output, :log when Queued, Locked - permitted.push :finished_at + permitted.push :finished_at, :log end else @@ -285,9 +438,9 @@ class Container < ArvadosModel # have access to just by setting the output field to the collection PDH. if output_changed? c = Collection. - readable_by(current_user). - where(portable_data_hash: self.output). - first + readable_by(current_user, {include_trash: true}). + where(portable_data_hash: self.output). + first if !c errors.add :output, "collection must exist and be readable by current user." end @@ -341,7 +494,7 @@ class Container < ArvadosModel act_as_system_user do if self.state == Cancelled - retryable_requests = ContainerRequest.where("priority > 0 and state = 'Committed' and container_count < container_count_max") + retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid) else retryable_requests = [] end