Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
#distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
debian8,debian9,centos7|python-gflags|2.0|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.6.2|2|python|all
-debian8,debian9,ubuntu1404,ubuntu1604,centos7|apache-libcloud|2.3.0|3|python|all|--depends 'python-requests >= 2.4.3'
debian8,debian9,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
debian8,debian9,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
debian8,debian9,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
#
# SPDX-License-Identifier: AGPL-3.0
-LIBCLOUD_PIN=2.3.0
+LIBCLOUD_PIN=2.3.1.dev1
-using_fork=false
+using_fork=true
if [[ $using_fork = true ]]; then
LIBCLOUD_PIN_SRC="https://github.com/curoverse/libcloud/archive/apache-libcloud-$LIBCLOUD_PIN.zip"
else
fpm_build $WORKSPACE/tools/crunchstat-summary ${PYTHON2_PKG_PREFIX}-crunchstat-summary 'Curoverse, Inc.' 'python' "$crunchstat_summary_version" "--url=https://arvados.org" "--description=Crunchstat-summary reads Arvados Crunch log files and summarize resource usage" --iteration "$iteration"
fi
-## if libcloud becomes our own fork see
-## https://dev.arvados.org/issues/12268#note-27
+# Forked libcloud
+if test_package_presence "$PYTHON2_PKG_PREFIX"-apache-libcloud "$LIBCLOUD_PIN" python 2
+then
+ LIBCLOUD_DIR=$(mktemp -d)
+ (
+ cd $LIBCLOUD_DIR
+ git clone $DASHQ_UNLESS_DEBUG https://github.com/curoverse/libcloud.git .
+ git checkout $DASHQ_UNLESS_DEBUG apache-libcloud-$LIBCLOUD_PIN
+ # libcloud is absurdly noisy without -q, so force -q here
+ OLD_DASHQ_UNLESS_DEBUG=$DASHQ_UNLESS_DEBUG
+ DASHQ_UNLESS_DEBUG=-q
+ handle_python_package
+ DASHQ_UNLESS_DEBUG=$OLD_DASHQ_UNLESS_DEBUG
+ )
+
+ # libcloud >= 2.3.0 now requires python-requests 2.4.3 or higher, otherwise
+ # it throws
+ # ImportError: No module named packages.urllib3.poolmanager
+ # when loaded. We only see this problem on ubuntu1404, because that is our
+ # only supported distribution that ships with a python-requests older than
+ # 2.4.3.
+ fpm_build $LIBCLOUD_DIR "$PYTHON2_PKG_PREFIX"-apache-libcloud "" python "" --iteration 2 --depends 'python-requests >= 2.4.3'
+ rm -rf $LIBCLOUD_DIR
+fi
# Python 2 dependencies
declare -a PIP_DOWNLOAD_SWITCHES=(--no-deps)
case it.Scratch < needScratch:
case it.RAM < needRAM:
case it.VCPUs < needVCPUs:
+ case it.Preemptable != ctr.SchedulingParameters.Preemptable:
case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
// Equal price, but worse specs
default:
c.Check(best.Scratch >= 2*GiB, check.Equals, true)
}
}
+
+func (*NodeSizeSuite) TestChoosePreemptable(c *check.C) {
+ menu := []arvados.InstanceType{
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Preemptable: true, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "almost best"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Preemptable: true, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Preemptable: true, Name: "small"},
+ }
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+ Mounts: map[string]arvados.Mount{
+ "/tmp": {Kind: "tmp", Capacity: 2 * GiB},
+ },
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 987654321,
+ KeepCacheRAM: 123456789,
+ },
+ SchedulingParameters: arvados.SchedulingParameters{
+ Preemptable: true,
+ },
+ })
+ c.Check(err, check.IsNil)
+ c.Check(best.Name, check.Equals, "best")
+ c.Check(best.RAM >= 1234567890, check.Equals, true)
+ c.Check(best.VCPUs >= 2, check.Equals, true)
+ c.Check(best.Scratch >= 2*GiB, check.Equals, true)
+ c.Check(best.Preemptable, check.Equals, true)
+}
RAM int64
Scratch int64
Price float64
+ Preemptable bool
}
// GetThisSystemNode returns a SystemNode for the node we're running
// SchedulingParameters specify a container's scheduling parameters
// such as Partitions
type SchedulingParameters struct {
- Partitions []string `json:"partitions"`
+ Partitions []string `json:"partitions"`
+ Preemptable bool `json:"preemptable"`
}
// ContainerList is an arvados#containerList resource.
before_validation :fill_field_defaults, :if => :new_record?
before_validation :validate_runtime_constraints
- before_validation :validate_scheduling_parameters
before_validation :set_container
+ before_validation :set_default_preemptable_scheduling_parameter
validates :command, :container_image, :output_path, :cwd, :presence => true
validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 }
+ validate :validate_scheduling_parameters
validate :validate_state_change
validate :check_update_whitelist
validate :secret_mounts_key_conflict
end
end
+ def set_default_preemptable_scheduling_parameter
+ if self.state == Committed
+ # If preemptable instances (eg: AWS Spot Instances) are allowed,
+ # ask them on child containers by default.
+ if Rails.configuration.preemptable_instances and
+ !self.requesting_container_uuid.nil? and
+ self.scheduling_parameters['preemptable'].nil?
+ self.scheduling_parameters['preemptable'] = true
+ end
+ end
+ end
+
def validate_runtime_constraints
case self.state
when Committed
scheduling_parameters['partitions'].size)
errors.add :scheduling_parameters, "partitions must be an array of strings"
end
+ if !Rails.configuration.preemptable_instances and scheduling_parameters['preemptable']
+ errors.add :scheduling_parameters, "preemptable instances are not allowed"
+ end
end
end
### Crunch, DNS & compute node management
###
+ # Preemptable instance support (e.g. AWS Spot Instances)
+ # When true, child containers will get created with the preemptable
+ # scheduling parameter parameter set.
+ preemptable_instances: false
+
# Docker image to be used when none found in runtime_constraints of a job
default_docker_image_for_jobs: false
assert_equal ContainerRequest::Final, cr3.state
end
+ [
+ [false, ActiveRecord::RecordInvalid],
+ [true, nil],
+ ].each do |preemptable_conf, expected|
+ test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, create preemptable container request and verify #{expected}" do
+ sp = {"preemptable" => true}
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ scheduling_parameters: sp,
+ mounts: {"test" => {"kind" => "json"}}}
+ Rails.configuration.preemptable_instances = preemptable_conf
+ set_user_from_auth :active
+
+ cr = create_minimal_req!(common_attrs)
+ cr.state = ContainerRequest::Committed
+
+ if !expected.nil?
+ assert_raises(expected) do
+ cr.save!
+ end
+ else
+ cr.save!
+ assert_equal sp, cr.scheduling_parameters
+ end
+ end
+ end
+
+ [
+ 'zzzzz-dz642-runningcontainr',
+ nil,
+ ].each do |requesting_c|
+ test "having preemptable instances active on the API server, a committed #{requesting_c.nil? ? 'non-':''}child CR should not ask for preemptable instance if parameter already set to false" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ scheduling_parameters: {"preemptable" => false},
+ mounts: {"test" => {"kind" => "json"}}}
+
+ Rails.configuration.preemptable_instances = true
+ set_user_from_auth :active
+
+ if requesting_c
+ cr = with_container_auth(Container.find_by_uuid requesting_c) do
+ create_minimal_req!(common_attrs)
+ end
+ assert_not_nil cr.requesting_container_uuid
+ else
+ cr = create_minimal_req!(common_attrs)
+ end
+
+ cr.state = ContainerRequest::Committed
+ cr.save!
+
+ assert_equal false, cr.scheduling_parameters['preemptable']
+ end
+ end
+
+ [
+ [true, 'zzzzz-dz642-runningcontainr', true],
+ [true, nil, nil],
+ [false, 'zzzzz-dz642-runningcontainr', nil],
+ [false, nil, nil],
+ ].each do |preemptable_conf, requesting_c, schedule_preemptable|
+ test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, #{requesting_c.nil? ? 'non-':''}child CR should #{schedule_preemptable ? '':'not'} ask for preemptable instance by default" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ mounts: {"test" => {"kind" => "json"}}}
+
+ Rails.configuration.preemptable_instances = preemptable_conf
+ set_user_from_auth :active
+
+ if requesting_c
+ cr = with_container_auth(Container.find_by_uuid requesting_c) do
+ create_minimal_req!(common_attrs)
+ end
+ assert_not_nil cr.requesting_container_uuid
+ else
+ cr = create_minimal_req!(common_attrs)
+ end
+
+ cr.state = ContainerRequest::Committed
+ cr.save!
+
+ assert_equal schedule_preemptable, cr.scheduling_parameters['preemptable']
+ end
+ end
+
[
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
def get_state(self):
"""Get node state, one of ['unpaired', 'busy', 'idle', 'down']."""
+ # If this node's size is invalid (because it has a stale arvados_node_size
+ # tag), return 'down' so that it's properly shut down.
+ if self.cloud_node.size.id == 'invalid':
+ return 'down'
+
# If this node is not associated with an Arvados node, return
# 'unpaired' if we're in the boot grace period, and 'down' if not,
# so it isn't counted towards usable nodes.
try:
kwargs = self.create_kwargs.copy()
kwargs.update(self.arvados_create_kwargs(size, arvados_node))
- kwargs['size'] = size
+ kwargs['size'] = size.real
return self.real.create_node(**kwargs)
except CLOUD_ERRORS as create_error:
# Workaround for bug #6702: sometimes the create node request
def _init_image(self, urn):
return "image", self.get_image(urn)
+ def create_node(self, size, arvados_node):
+ # Set up tag indicating the Arvados assigned Cloud Size id.
+ self.create_kwargs.setdefault('ex_tags', {})
+ self.create_kwargs['ex_tags'].update({'arvados_node_size': size.id})
+ return super(ComputeNodeDriver, self).create_node(size, arvados_node)
+
def list_nodes(self):
# Azure only supports filtering node lists by resource group.
# Do our own filtering based on tag.
# Need to populate Node.size
if not n.size:
n.size = self.sizes[n.extra["properties"]["hardwareProfile"]["vmSize"]]
+ n.extra['arvados_node_size'] = n.extra.get('tags', {}).get('arvados_node_size')
return nodes
def broken(self, cloud_node):
"VolumeSize": volsize,
"VolumeType": "gp2"
}}]
+ if size.preemptable:
+ # Request a Spot instance for this node
+ kw['ex_spot_market'] = True
return kw
def sync_node(self, cloud_node, arvados_node):
self.real.ex_create_tags(cloud_node,
{'Name': arvados_node_fqdn(arvados_node)})
+ def create_node(self, size, arvados_node):
+ # Set up tag indicating the Arvados assigned Cloud Size id.
+ self.create_kwargs['ex_metadata'].update({'arvados_node_size': size.id})
+ return super(ComputeNodeDriver, self).create_node(size, arvados_node)
+
def list_nodes(self):
# Need to populate Node.size
nodes = super(ComputeNodeDriver, self).list_nodes()
for n in nodes:
if not n.size:
n.size = self.sizes[n.extra["instance_type"]]
+ n.extra['arvados_node_size'] = n.extra.get('metadata', {}).get('arvados_node_size')
return nodes
@classmethod
return result
+ def create_node(self, size, arvados_node):
+ # Set up tag indicating the Arvados assigned Cloud Size id.
+ self.create_kwargs['ex_metadata'].update({'arvados_node_size': size.id})
+ return super(ComputeNodeDriver, self).create_node(size, arvados_node)
+
def list_nodes(self):
# The GCE libcloud driver only supports filtering node lists by zone.
# Do our own filtering based on tag list.
if nodelist and not hasattr(nodelist[0].size, 'id'):
for node in nodelist:
node.size = self._sizes_by_id[node.size]
+ node.extra['arvados_node_size'] = node.extra.get('metadata', {}).get('arvados_node_size')
return nodelist
@classmethod
from .baseactor import BaseNodeManagerActor
+from functools import partial
from libcloud.common.types import LibcloudError
from libcloud.common.exceptions import BaseHTTPError
if not self.has_option(sec_name, opt_name):
self.set(sec_name, opt_name, value)
- def get_section(self, section, transformer=None):
+ def get_section(self, section, transformers={}, default_transformer=None):
+ transformer_map = {
+ int: self.getint,
+ bool: self.getboolean,
+ float: self.getfloat,
+ }
result = self._dict()
for key, value in self.items(section):
+ transformer = None
+ if transformers.get(key) in transformer_map:
+ transformer = partial(transformer_map[transformers[key]], section)
+ elif default_transformer in transformer_map:
+ transformer = partial(transformer_map[default_transformer], section)
if transformer is not None:
try:
- value = transformer(value)
+ value = transformer(key)
except (TypeError, ValueError):
pass
result[key] = value
self.get_section('Cloud Create'),
driver_class=driver_class)
- def node_sizes(self, all_sizes):
+ def node_sizes(self):
"""Finds all acceptable NodeSizes for our installation.
Returns a list of (NodeSize, kwargs) pairs for each NodeSize object
returned by libcloud that matches a size listed in our config file.
"""
-
+ all_sizes = self.new_cloud_client().list_sizes()
size_kwargs = {}
+ section_types = {
+ 'price': float,
+ 'preemptable': bool,
+ }
for sec_name in self.sections():
sec_words = sec_name.split(None, 2)
if sec_words[0] != 'Size':
continue
- size_spec = self.get_section(sec_name, int)
- if 'price' in size_spec:
- size_spec['price'] = float(size_spec['price'])
+ size_spec = self.get_section(sec_name, section_types, int)
+ if 'preemptable' not in size_spec:
+ size_spec['preemptable'] = False
+ if 'instance_type' not in size_spec:
+ # Assume instance type is Size name if missing
+ size_spec['instance_type'] = sec_words[1]
+ size_spec['id'] = sec_words[1]
size_kwargs[sec_words[1]] = size_spec
# EC2 node sizes are identified by id. GCE sizes are identified by name.
matching_sizes = []
for size in all_sizes:
- if size.id in size_kwargs:
- matching_sizes.append((size, size_kwargs[size.id]))
- elif size.name in size_kwargs:
- matching_sizes.append((size, size_kwargs[size.name]))
+ matching_sizes += [
+ (size, size_kwargs[s]) for s in size_kwargs
+ if size_kwargs[s]['instance_type'] == size.id
+ or size_kwargs[s]['instance_type'] == size.name
+ ]
return matching_sizes
def shutdown_windows(self):
that would best satisfy the jobs, choosing the cheapest size that
satisfies each job, and ignoring jobs that can't be satisfied.
"""
+ class InvalidCloudSize(object):
+ """
+ Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't
+ have a recognizable arvados_node_size tag.
+ """
+ def __init__(self):
+ self.id = 'invalid'
+ self.name = 'invalid'
+ self.ram = 0
+ self.disk = 0
+ self.scratch = 0
+ self.cores = 0
+ self.bandwidth = 0
+ self.price = 9999999
+ self.preemptable = False
+ self.extra = {}
+
+ def meets_constraints(self, **kwargs):
+ return False
+
class CloudSizeWrapper(object):
def __init__(self, real_size, node_mem_scaling, **kwargs):
self.disk = 0
self.scratch = self.disk * 1000
self.ram = int(self.ram * node_mem_scaling)
+ self.preemptable = False
for name, override in kwargs.iteritems():
+ if name == 'instance_type': continue
if not hasattr(self, name):
raise ValueError("unrecognized size field '%s'" % (name,))
setattr(self, name, override)
wants = {'cores': want_value('min_cores_per_node'),
'ram': want_value('min_ram_mb_per_node'),
'scratch': want_value('min_scratch_mb_per_node')}
+ # EC2 node sizes are identified by id. GCE sizes are identified by name.
for size in self.cloud_sizes:
if (size.meets_constraints(**wants) and
- (specified_size is None or size.id == specified_size)):
- return size
+ (specified_size is None or
+ size.id == specified_size or size.name == specified_size)):
+ return size
return None
def servers_for_queue(self, queue):
"Job's min_nodes constraint is greater than the configured "
"max_nodes (%d)" % self.max_nodes)
elif (want_count*cloud_size.price <= self.max_price):
- servers.extend([cloud_size.real] * want_count)
+ servers.extend([cloud_size] * want_count)
else:
unsatisfiable_jobs[job['uuid']] = (
"Job's price (%d) is above system's max_price "
for s in self.cloud_sizes:
if s.id == sizeid:
return s
- return None
+ return InvalidCloudSize()
class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
return root_logger
def build_server_calculator(config):
- cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
+ cloud_size_list = config.node_sizes()
if not cloud_size_list:
abort("No valid node sizes configured")
return ServerCalculator(cloud_size_list,
def _send_request(self):
nodes = self._client.list_nodes()
for n in nodes:
- # Replace with libcloud NodeSize object with compatible
+ # Replace the libcloud NodeSize object with compatible
# CloudSizeWrapper object which merges the size info reported from
# the cloud with size information from the configuration file.
- n.size = self._calculator.find_size(n.size.id)
+ n.size = self._calculator.find_size(n.extra['arvados_node_size'])
return nodes
global all_nodes, create_calls
create_calls += 1
nodeid = "node%i" % create_calls
+ if ex_tags is None:
+ ex_tags = {}
+ ex_tags.update({'arvados_node_size': size.id})
n = Node(nodeid, nodeid, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags})
all_nodes.append(n)
if ex_customdata:
ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0]
if ex_userdata:
ping_url = ex_userdata
- if ex_metadata:
+ elif ex_metadata:
ping_url = ex_metadata["arv-ping-url"]
ping_url += "&instance_id=" + nodeid
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
auth=auth,
ex_metadata=ex_metadata,
ex_userdata=ex_userdata)
- n.extra = {"launch_time": time.strftime(ARVADOS_TIMEFMT, time.gmtime())[:-1]}
+ n.extra = {
+ "launch_time": time.strftime(ARVADOS_TIMEFMT, time.gmtime())[:-1],
+ "metadata" : {
+ "arvados_node_size": size.id
+ }
+ }
return n
def list_sizes(self, **kwargs):
ex_metadata=ex_metadata)
n.extra = {
"metadata": {
- "items": [{"key": k, "value": v} for k,v in ex_metadata.iteritems()]
+ "items": [{"key": k, "value": v} for k,v in ex_metadata.iteritems()],
+ "arvados_node_size": size.id
},
"zone": "fake"
}
'python-daemon',
'setuptools'
],
+ dependency_links=[
+ "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.3.1.dev1.zip"
+ ],
test_suite='tests',
tests_require=[
'requests',
'pbr<1.7.0',
'mock>=1.0',
- 'apache-libcloud>=2.3',
+ 'apache-libcloud>=2.3.1.dev1',
],
zip_safe=False
)
for an_error, is_cloud_error in errors:
self.driver_mock().create_node.side_effect = an_error
with self.assertRaises(an_error):
- driver.create_node('1', 'id_1')
+ driver.create_node(testutil.MockSize(1), 'id_1')
if is_cloud_error:
error_count += 1
self.assertEqual(error_count, status.tracker.get('create_node_errors'))
driver.create_node(testutil.MockSize(1), arv_node)
create_method = self.driver_mock().create_node
self.assertTrue(create_method.called)
- self.assertEqual(
- {'test':'testvalue'},
- create_method.call_args[1].get('ex_metadata', {'arg': 'missing'})
+ self.assertIn(
+ ('test', 'testvalue'),
+ create_method.call_args[1].get(
+ 'ex_metadata',
+ {'arg': 'missing'}).items()
)
def test_hostname_from_arvados_node(self):
def test_list_sizes(self):
config = self.load_config()
- client = config.new_cloud_client()
- sizes = config.node_sizes(client.list_sizes())
+ sizes = config.node_sizes()
self.assertEqual(1, len(sizes))
size, kwargs = sizes[0]
self.assertEqual('Small', size.name)
self.public_ips = []
self.size = testutil.MockSize(1)
self.state = 0
+ self.extra = {'arvados_node_size': self.size.id}
def build_monitor(self, side_effect, *args, **kwargs):
self.bandwidth = 16 * factor
self.price = float(factor)
self.extra = {}
+ self.real = self
+ self.preemptable = False
def __eq__(self, other):
return self.id == other.id