# already know how to properly treat them.
attribute :properties, :jsonbHash, default: {}
attribute :secret_mounts, :jsonbHash, default: {}
+ attribute :output_storage_classes, :jsonbArray, default: lambda { Rails.configuration.DefaultStorageClasses }
+ attribute :output_properties, :jsonbHash, default: {}
serialize :environment, Hash
serialize :mounts, Hash
serialize :scheduling_parameters, Hash
after_find :fill_container_defaults_after_find
+ after_initialize { @state_was_when_initialized = self.state_was } # see finalize_if_needed
before_validation :fill_field_defaults, :if => :new_record?
before_validation :fill_container_defaults
- before_validation :set_default_preemptible_scheduling_parameter
- before_validation :set_container
validates :command, :container_image, :output_path, :cwd, :presence => true
validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 }
validate :check_update_whitelist
validate :secret_mounts_key_conflict
validate :validate_runtime_token
- before_save :scrub_secrets
+ after_validation :scrub_secrets
+ after_validation :set_preemptible
+ after_validation :set_container
before_create :set_requesting_container_uuid
before_destroy :set_priority_zero
after_save :update_priority
t.add :scheduling_parameters
t.add :state
t.add :use_existing
+ t.add :output_storage_classes
+ t.add :output_properties
+ t.add :cumulative_cost
end
# Supported states for a container request
:container_image, :cwd, :environment, :filters, :mounts,
:output_path, :priority, :runtime_token,
:runtime_constraints, :state, :container_uuid, :use_existing,
- :scheduling_parameters, :secret_mounts, :output_name, :output_ttl]
+ :scheduling_parameters, :secret_mounts, :output_name, :output_ttl,
+ :output_storage_classes, :output_properties]
+
+ def self.any_preemptible_instances?
+ Rails.configuration.InstanceTypes.any? do |k, v|
+ v["Preemptible"]
+ end
+ end
def self.limit_index_columns_read
["mounts"]
def finalize!
container = Container.find_by_uuid(container_uuid)
if !container.nil?
- update_collections(container: container)
+ # We don't want to add the container cost if the container was
+ # already finished when this CR was committed. But we are
+ # running in an after_save hook after a lock/reload, so
+ # state_was has already been updated to Committed regardless.
+ # Hence the need for @state_was_when_initialized.
+ if @state_was_when_initialized == Committed
+ # Add the final container cost to our cumulative cost (which
+ # may already be non-zero from previous attempts if
+ # container_count_max > 1).
+ self.cumulative_cost += container.cost + container.subrequests_cost
+ end
+ # Add our cumulative cost to the subrequests_cost of the
+ # requesting container, if any.
+ if self.requesting_container_uuid
+ Container.where(
+ uuid: self.requesting_container_uuid,
+ state: Container::Running,
+ ).each do |c|
+ c.subrequests_cost += self.cumulative_cost
+ c.save!
+ end
+ end
+
+ update_collections(container: container)
+ # update_collections makes a log collection that includes all of the logs
+ # for all of the containers associated with this request. For requests
+ # that are retried, this is the primary way users can get logs for
+ # failed containers.
+ # The code below makes a log collection that is a verbatim copy of the
+ # container's logs. This is required for container reuse: a container
+ # will not be reused if the owner cannot read a collection with its logs.
+ # See the "readable log" section of Container.find_reusable().
if container.state == Container::Complete
log_col = Collection.where(portable_data_hash: container.log).first
if log_col
'container_uuid' => container_uuid,
},
portable_data_hash: log_col.portable_data_hash,
- manifest_text: log_col.manifest_text)
+ manifest_text: log_col.manifest_text,
+ storage_classes_desired: self.output_storage_classes
+ )
completed_coll.save_with_unique_name!
end
end
owner_uuid: self.owner_uuid,
name: coll_name,
manifest_text: "",
- properties: {
- 'type' => out_type,
- 'container_request' => uuid,
- })
+ storage_classes_desired: self.output_storage_classes)
end
if out_type == "log"
manifest = dst.manifest_text
end
+ merged_properties = {}
+ merged_properties['container_request'] = uuid
+
+ if out_type == 'output' and !requesting_container_uuid.nil?
+ # output of a child process, give it "intermediate" type by
+ # default.
+ merged_properties['type'] = 'intermediate'
+ else
+ merged_properties['type'] = out_type
+ end
+
+ if out_type == "output"
+ merged_properties.update(container.output_properties)
+ merged_properties.update(self.output_properties)
+ end
+
coll.assign_attributes(
portable_data_hash: Digest::MD5.hexdigest(manifest) + '+' + manifest.bytesize.to_s,
manifest_text: manifest,
trash_at: trash_at,
- delete_at: trash_at)
+ delete_at: trash_at,
+ properties: merged_properties)
coll.save_with_unique_name!
self.send(out_type + '_uuid=', coll.uuid)
end
end
def self.full_text_searchable_columns
- super - ["mounts", "secret_mounts", "secret_mounts_md5", "runtime_token"]
+ super - ["mounts", "secret_mounts", "secret_mounts_md5", "runtime_token", "output_storage_classes"]
+ end
+
+ def set_priority_zero
+ self.update_attributes!(priority: 0) if self.priority > 0 && self.state != Final
end
protected
errors.add :container_uuid, "can only be updated to nil."
return false
end
+ if self.container_count_changed?
+ errors.add :container_count, "cannot be updated directly."
+ return false
+ end
if state_changed? and state == Committed and container_uuid.nil?
+ if self.command.length > 0 and self.command[0] == "arvados-cwl-runner"
+ # Special case, arvados-cwl-runner processes are always considered "supervisors"
+ self.scheduling_parameters['supervisor'] = true
+ end
while true
c = Container.resolve(self)
c.lock!
end
end
if self.container_uuid != self.container_uuid_was
- if self.container_count_changed?
- errors.add :container_count, "cannot be updated directly."
- return false
- end
-
self.container_count += 1
return if self.container_uuid_was.nil?
- old_container = Container.find_by_uuid(self.container_uuid_was)
- return if old_container.nil?
+ old_container_uuid = self.container_uuid_was
+ old_container_log = Container.where(uuid: old_container_uuid).pluck(:log).first
+ return if old_container_log.nil?
- old_logs = Collection.where(portable_data_hash: old_container.log).first
+ old_logs = Collection.where(portable_data_hash: old_container_log).first
return if old_logs.nil?
log_coll = self.log_uuid.nil? ? nil : Collection.where(uuid: self.log_uuid).first
log_coll = Collection.new(
owner_uuid: self.owner_uuid,
name: coll_name = "Container log for request #{uuid}",
- manifest_text: "")
+ manifest_text: "",
+ storage_classes_desired: self.output_storage_classes)
end
# copy logs from old container into CR's log collection
src = Arv::Collection.new(old_logs.manifest_text)
dst = Arv::Collection.new(log_coll.manifest_text)
- dst.cp_r("./", "log for container #{old_container.uuid}", src)
+ dst.cp_r("./", "log for container #{old_container_uuid}", src)
manifest = dst.manifest_text
log_coll.assign_attributes(
end
end
- def set_default_preemptible_scheduling_parameter
- if Rails.configuration.Containers.UsePreemptibleInstances && state == Committed && get_requesting_container()
+ def set_preemptible
+ if (new_record? || state_changed?) &&
+ state == Committed &&
+ Rails.configuration.Containers.AlwaysUsePreemptibleInstances &&
+ get_requesting_container_uuid() &&
+ self.class.any_preemptible_instances?
self.scheduling_parameters['preemptible'] = true
end
end
"[#{k}]=#{v.inspect} must be a positive integer")
end
end
+ if runtime_constraints['cuda']
+ ['device_count'].each do |k|
+ v = runtime_constraints['cuda'][k]
+ if !v.is_a?(Integer) || v < 0
+ errors.add(:runtime_constraints,
+ "[cuda.#{k}]=#{v.inspect} must be a positive or zero integer")
+ end
+ end
+ ['driver_version', 'hardware_capability'].each do |k|
+ v = runtime_constraints['cuda'][k]
+ if !v.is_a?(String) || (runtime_constraints['cuda']['device_count'] > 0 && v.to_f == 0.0)
+ errors.add(:runtime_constraints,
+ "[cuda.#{k}]=#{v.inspect} must be a string in format 'X.Y'")
+ end
+ end
+ end
end
end
scheduling_parameters['partitions'].size)
errors.add :scheduling_parameters, "partitions must be an array of strings"
end
- if !Rails.configuration.Containers.UsePreemptibleInstances and scheduling_parameters['preemptible']
- errors.add :scheduling_parameters, "preemptible instances are not allowed"
+ if scheduling_parameters['preemptible'] &&
+ (new_record? || state_changed?) &&
+ !self.class.any_preemptible_instances?
+ errors.add :scheduling_parameters, "preemptible instances are not configured in InstanceTypes"
end
if scheduling_parameters.include? 'max_run_time' and
(!scheduling_parameters['max_run_time'].is_a?(Integer) ||
if self.new_record? || self.state_was == Uncommitted
# Allow create-and-commit in a single operation.
permitted.push(*AttrsPermittedBeforeCommit)
+ elsif mounts_changed? && mounts_was.keys.sort == mounts.keys.sort
+ # Ignore the updated mounts if the only changes are default/zero
+ # values as added by controller, see 17774
+ only_defaults = true
+ mounts.each do |path, mount|
+ (mount.to_a - mounts_was[path].to_a).each do |k, v|
+ if ![0, "", false, nil].index(v)
+ only_defaults = false
+ end
+ end
+ end
+ if only_defaults
+ clear_attribute_change("mounts")
+ end
end
case self.state
when Committed
- permitted.push :priority, :container_count_max, :container_uuid
-
- if self.container_uuid.nil?
- self.errors.add :container_uuid, "has not been resolved to a container."
- end
+ permitted.push :priority, :container_count_max, :container_uuid, :cumulative_cost
if self.priority.nil?
self.errors.add :priority, "cannot be nil"
end
- # Allow container count to increment by 1
- if (self.container_uuid &&
- self.container_uuid != self.container_uuid_was &&
- self.container_count == 1 + (self.container_count_was || 0))
- permitted.push :container_count
- end
+ # Allow container count to increment (not by client, only by us
+ # -- see set_container)
+ permitted.push :container_count
if current_user.andand.is_admin
permitted.push :log_uuid
when Final
if self.state_was == Committed
# "Cancel" means setting priority=0, state=Committed
- permitted.push :priority
+ permitted.push :priority, :cumulative_cost
if current_user.andand.is_admin
permitted.push :output_uuid, :log_uuid
end
end
- def set_priority_zero
- self.update_attributes!(priority: 0) if self.state != Final
- end
-
def set_requesting_container_uuid
- c = get_requesting_container()
- if !c.nil?
- self.requesting_container_uuid = c.uuid
+ if (self.requesting_container_uuid = get_requesting_container_uuid())
# Determine the priority of container request for the requesting
# container.
self.priority = ContainerRequest.where(container_uuid: self.requesting_container_uuid).maximum("priority") || 0
end
end
- def get_requesting_container
- return self.requesting_container_uuid if !self.requesting_container_uuid.nil?
- Container.for_current_token
+ def get_requesting_container_uuid
+ return self.requesting_container_uuid || Container.for_current_token.andand.uuid
end
end