"kind": "tmp"
}
}
+ scheduling_parameters = {}
dirs = set()
for f in self.pathmapper.files():
partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
if partition_req:
- runtime_constraints["partition"] = aslist(partition_req["partition"])
+ scheduling_parameters["partitions"] = aslist(partition_req["partition"])
container_request["mounts"] = mounts
container_request["runtime_constraints"] = runtime_constraints
container_request["use_existing"] = kwargs.get("enable_reuse", True)
+ container_request["scheduling_parameters"] = scheduling_parameters
try:
response = self.arvrunner.api.container_requests().create(
'output_path': '/var/spool/cwl',
'container_image': '99999999999999999999999999999993+99',
'command': ['ls', '/var/spool/cwl'],
- 'cwd': '/var/spool/cwl'
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {}
})
# The test passes some fields in builder.resources
make_fs_access=make_fs_access, tmpdir="/tmp"):
j.run()
- runner.api.container_requests().create.assert_called_with(
- body={
+ call_args, call_kwargs = runner.api.container_requests().create.call_args
+
+ call_body_expected = {
'environment': {
'HOME': '/var/spool/cwl',
'TMPDIR': '/tmp'
'vcpus': 3,
'ram': 3145728000,
'keep_cache_ram': 512,
- 'API': True,
- 'partition': ['blurb']
+ 'API': True
},
'use_existing': True,
'priority': 1,
'output_path': '/var/spool/cwl',
'container_image': '99999999999999999999999999999993+99',
'command': ['ls'],
- 'cwd': '/var/spool/cwl'
- })
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {
+ 'partitions': ['blurb']
+ }
+ }
+
+ call_body = call_kwargs.get('body', None)
+ self.assertNotEqual(None, call_body)
+ for key in call_body:
+ self.assertEqual(call_body_expected.get(key), call_body.get(key))
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
// Container is an arvados#container resource.
type Container struct {
- UUID string `json:"uuid"`
- Command []string `json:"command"`
- ContainerImage string `json:"container_image"`
- Cwd string `json:"cwd"`
- Environment map[string]string `json:"environment"`
- LockedByUUID string `json:"locked_by_uuid"`
- Mounts map[string]Mount `json:"mounts"`
- Output string `json:"output"`
- OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
- RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
- State ContainerState `json:"state"`
+ UUID string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ LockedByUUID string `json:"locked_by_uuid"`
+ Mounts map[string]Mount `json:"mounts"`
+ Output string `json:"output"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ State ContainerState `json:"state"`
+ SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
}
// Mount is special behavior to attach to a filesystem path or device.
// CPU) and network connectivity.
type RuntimeConstraints struct {
API *bool
- RAM int `json:"ram"`
- VCPUs int `json:"vcpus"`
- KeepCacheRAM int `json:"keep_cache_ram"`
- Partition []string `json:"partition"`
+ RAM int `json:"ram"`
+ VCPUs int `json:"vcpus"`
+ KeepCacheRAM int `json:"keep_cache_ram"`
+}
+
+// SchedulingParameters specify a container's scheduling parameters
+// such as Partitions
+type SchedulingParameters struct {
+ Partitions []string `json:"partitions"`
}
// ContainerList is an arvados#containerList resource.
serialize :mounts, Hash
serialize :runtime_constraints, Hash
serialize :command, Array
+ serialize :scheduling_parameters, Hash
before_validation :fill_field_defaults, :if => :new_record?
before_validation :set_timestamps
t.add :started_at
t.add :state
t.add :auth_uuid
+ t.add :scheduling_parameters
end
# Supported states for a container
self.mounts ||= {}
self.cwd ||= "."
self.priority ||= 1
+ self.scheduling_parameters ||= {}
end
def permission_to_create
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
:environment, :mounts, :output_path, :priority,
- :runtime_constraints)
+ :runtime_constraints, :scheduling_parameters)
end
case self.state
if self.runtime_constraints_changed?
self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
end
+ if self.scheduling_parameters_changed?
+ self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
+ end
end
def handle_completed
output_path: self.output_path,
container_image: self.container_image,
mounts: self.mounts,
- runtime_constraints: self.runtime_constraints
+ runtime_constraints: self.runtime_constraints,
+ scheduling_parameters: self.scheduling_parameters
}
c = Container.create! c_attrs
retryable_requests.each do |cr|
serialize :mounts, Hash
serialize :runtime_constraints, Hash
serialize :command, Array
+ serialize :scheduling_parameters, Hash
before_validation :fill_field_defaults, :if => :new_record?
before_validation :validate_runtime_constraints
+ before_validation :validate_scheduling_parameters
before_validation :set_container
validates :command, :container_image, :output_path, :cwd, :presence => true
validate :validate_state_change
t.add :runtime_constraints
t.add :state
t.add :use_existing
+ t.add :scheduling_parameters
end
# Supported states for a container request
self.mounts ||= {}
self.cwd ||= "."
self.container_count_max ||= Rails.configuration.container_count_max
+ self.scheduling_parameters ||= {}
end
# Create a new container (or find an existing one) to satisfy this
if not reusable.nil?
reusable
else
+ c_attrs[:scheduling_parameters] = self.scheduling_parameters
Container.create!(c_attrs)
end
end
end
end
+ def validate_scheduling_parameters
+ if self.state == Committed
+ if scheduling_parameters.include? 'partitions' and
+ (!scheduling_parameters['partitions'].is_a?(Array) ||
+ scheduling_parameters['partitions'].reject{|x| !x.is_a?(String)}.size !=
+ scheduling_parameters['partitions'].size)
+ errors.add :scheduling_parameters, "partitions must be an array of strings"
+ end
+ end
+ end
+
def validate_change
permitted = [:owner_uuid]
:container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :priority,
:properties, :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :use_existing
+ :state, :container_uuid, :use_existing, :scheduling_parameters
when Committed
if container_uuid.nil?
permitted.push :command, :container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :properties,
:requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid
+ :state, :container_uuid, :scheduling_parameters
end
when Final
--- /dev/null
+class AddSchedulingParametersToContainer < ActiveRecord::Migration
+ def change
+ add_column :containers, :scheduling_parameters, :text
+ add_column :container_requests, :scheduling_parameters, :text
+ end
+end
filters text,
updated_at timestamp without time zone NOT NULL,
container_count integer DEFAULT 0,
- use_existing boolean DEFAULT true
+ use_existing boolean DEFAULT true,
+ scheduling_parameters text
);
updated_at timestamp without time zone NOT NULL,
exit_code integer,
auth_uuid character varying(255),
- locked_by_uuid character varying(255)
+ locked_by_uuid character varying(255),
+ scheduling_parameters text
);
INSERT INTO schema_migrations (version) VALUES ('20160926194129');
-INSERT INTO schema_migrations (version) VALUES ('20161019171346');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161019171346');
+
+INSERT INTO schema_migrations (version) VALUES ('20161111143147');
\ No newline at end of file
end
end
end
+
+ [
+ [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
+ [{"partitions" => "fastcpu"}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"partitions" => "fastcpu"}, ContainerRequest::Uncommitted],
+ [{"partitions" => ["fastcpu","vfastcpu"]}, ContainerRequest::Committed],
+ ].each do |sp, state, expected|
+ test "create container request with scheduling_parameters #{sp} in state #{state} and verify #{expected}" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ scheduling_parameters: sp,
+ mounts: {"test" => {"kind" => "json"}}}
+ set_user_from_auth :active
+
+ if expected == ActiveRecord::RecordInvalid
+ assert_raises(ActiveRecord::RecordInvalid) do
+ create_minimal_req!(common_attrs.merge({state: state}))
+ end
+ else
+ cr = create_minimal_req!(common_attrs.merge({state: state}))
+ assert_equal sp, cr.scheduling_parameters
+
+ if state == ContainerRequest::Committed
+ c = Container.find_by_uuid(cr.container_uuid)
+ assert_equal sp, c.scheduling_parameters
+ end
+ end
+ end
+ end
end
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
- if container.RuntimeConstraints.Partition != nil {
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.RuntimeConstraints.Partition, ",")))
+ if container.SchedulingParameters.Partitions != nil {
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
return exec.Command("sbatch", sbatchArgs...)
func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
theConfig.SbatchArguments = nil
- container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1, Partition: []string{"blurb", "b2"}}}
+ container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
sbatchCmd := sbatchFunc(container)
var expected []string