'poll_time': '60',
'max_poll_time': '300',
'poll_stale_after': '600',
+ 'max_total_price': '0',
'boot_fail_after': str(sys.maxint),
'node_stale_after': str(60 * 60 * 2)},
'Logging': {'file': '/dev/stderr',
def __init__(self, server_wishlist_actor, arvados_nodes_actor,
cloud_nodes_actor, cloud_update_actor, timer_actor,
arvados_factory, cloud_factory,
- shutdown_windows, min_size, min_nodes, max_nodes,
+ shutdown_windows, server_calculator,
+ min_nodes, max_nodes,
poll_stale_after=600,
boot_fail_after=1800,
node_stale_after=7200,
node_setup_class=dispatch.ComputeNodeSetupActor,
node_shutdown_class=dispatch.ComputeNodeShutdownActor,
- node_actor_class=dispatch.ComputeNodeMonitorActor):
+ node_actor_class=dispatch.ComputeNodeMonitorActor,
+ max_total_price=0):
super(NodeManagerDaemonActor, self).__init__()
self._node_setup = node_setup_class
self._node_shutdown = node_shutdown_class
self._logger = logging.getLogger('arvnodeman.daemon')
self._later = self.actor_ref.proxy()
self.shutdown_windows = shutdown_windows
- self.min_cloud_size = min_size
+ self.server_calculator = server_calculator
+ self.min_cloud_size = self.server_calculator.cheapest_size()
self.min_nodes = min_nodes
self.max_nodes = max_nodes
+ self.max_total_price = max_total_price
self.poll_stale_after = poll_stale_after
self.boot_fail_after = boot_fail_after
self.node_stale_after = node_stale_after
self._pair_nodes(cloud_rec, arv_node)
break
- def _nodes_up(self):
- return sum(len(nodelist) for nodelist in
- [self.cloud_nodes, self.booted, self.booting])
-
- def _nodes_busy(self):
+ def _nodes_up(self, size):
+ up = 0
+ up += sum(1
+ for c in self.booting.itervalues()
+ if size is None or c.cloud_size.get().id == size.id)
+ up += sum(1
+ for i in (self.booted, self.cloud_nodes.nodes)
+ for c in i.itervalues()
+ if size is None or (c.cloud_node.size and c.cloud_node.size.id == size.id))
+ return up
+
+ def _total_price(self):
+ cost = 0
+ cost += sum(c.cloud_size.get().price
+ for c in self.booting.itervalues())
+ cost += sum(c.cloud_node.size.price
+ for i in (self.booted, self.cloud_nodes.nodes)
+ for c in i.itervalues()
+ if c.cloud_node.size)
+ return cost
+
+ def _nodes_busy(self, size):
return sum(1 for busy in
pykka.get_all(rec.actor.in_state('busy') for rec in
- self.cloud_nodes.nodes.itervalues())
+ self.cloud_nodes.nodes.itervalues()
+ if (rec.cloud_node.size and rec.cloud_node.size.id == size.id))
if busy)
- def _nodes_missing(self):
+ def _nodes_missing(self, size):
return sum(1 for arv_node in
pykka.get_all(rec.actor.arvados_node for rec in
self.cloud_nodes.nodes.itervalues()
- if rec.actor.cloud_node.get().id not in self.shutdowns)
+ if rec.cloud_node.size and rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
- def _nodes_wanted(self):
- up_count = self._nodes_up()
- under_min = self.min_nodes - up_count
- over_max = up_count - self.max_nodes
+ def _size_wishlist(self, size):
+ return sum(1 for c in self.last_wishlist if c.id == size.id)
+
+ def _size_shutdowns(self, size):
+ return sum(1 for c in self.shutdowns.itervalues()
+ if c.cloud_node.get().size.id == size.id)
+
+ def _nodes_wanted(self, size):
+ total_up_count = self._nodes_up(None)
+ under_min = self.min_nodes - total_up_count
+ over_max = total_up_count - self.max_nodes
+ total_price = self._total_price()
+
if over_max >= 0:
return -over_max
- elif under_min > 0:
+ elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
+
+ up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
+ self._nodes_busy(size) +
+ self._nodes_missing(size))
+
+ wanted = self._size_wishlist(size) - up_count
+ if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
+ can_boot = int((self.max_total_price - total_price) / size.price)
+ if can_boot == 0:
+ self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
+ size.name, size.price, self.max_total_price, total_price)
+ return can_boot
else:
- up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
- return len(self.last_wishlist) - up_count
-
- def _nodes_excess(self):
- up_count = self._nodes_up() - len(self.shutdowns)
- over_min = up_count - self.min_nodes
- if over_min <= 0:
- return over_min
- else:
- return up_count - self._nodes_busy() - len(self.last_wishlist)
+ return wanted
+
+ def _nodes_excess(self, size):
+ up_count = self._nodes_up(size) - self._size_shutdowns(size)
+ if size.id == self.min_cloud_size.id:
+ up_count -= self.min_nodes
+ return up_count - self._nodes_busy(size) - self._size_wishlist(size)
def update_server_wishlist(self, wishlist):
self._update_poll_time('server_wishlist')
self.last_wishlist = wishlist
- nodes_wanted = self._nodes_wanted()
- if nodes_wanted > 0:
- self._later.start_node()
- elif (nodes_wanted < 0) and self.booting:
- self._later.stop_booting_node()
+ for sz in reversed(self.server_calculator.cloud_sizes):
+ size = sz.real
+ nodes_wanted = self._nodes_wanted(size)
+ if nodes_wanted > 0:
+ self._later.start_node(size)
+ elif (nodes_wanted < 0) and self.booting:
+ self._later.stop_booting_node(size)
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
return wrapper
@_check_poll_freshness
- def start_node(self):
- nodes_wanted = self._nodes_wanted()
+ def start_node(self, cloud_size):
+ nodes_wanted = self._nodes_wanted(cloud_size)
if nodes_wanted < 1:
return None
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- try:
- cloud_size = self.last_wishlist[self._nodes_up()]
- except IndexError:
- cloud_size = self.min_cloud_size
self._logger.info("Want %s more nodes. Booting a %s node.",
nodes_wanted, cloud_size.name)
new_setup = self._node_setup.start(
time.time())
new_setup.subscribe(self._later.node_up)
if nodes_wanted > 1:
- self._later.start_node()
+ self._later.start_node(cloud_size)
def _get_actor_attrs(self, actor, *attr_names):
return pykka.get_all([getattr(actor, name) for name in attr_names])
self._later.shutdown_unpaired_node, cloud_node.id)
@_check_poll_freshness
- def stop_booting_node(self):
- nodes_excess = self._nodes_excess()
+ def stop_booting_node(self, size):
+ nodes_excess = self._nodes_excess(size)
if (nodes_excess < 1) or not self.booting:
return None
for key, node in self.booting.iteritems():
- if node.stop_if_no_cloud_node().get():
+ if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
if nodes_excess > 1:
- self._later.stop_booting_node()
+ self._later.stop_booting_node(size)
break
def _begin_node_shutdown(self, node_actor, cancellable):
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
- if self._nodes_excess() > 0:
+ if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
self._begin_node_shutdown(node_actor, cancellable=True)
def shutdown_unpaired_node(self, cloud_node_id):
return True
- def __init__(self, server_list, max_nodes=None):
+ def __init__(self, server_list, max_nodes=None, max_price=None):
self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
for s, kws in server_list]
self.cloud_sizes.sort(key=lambda s: s.price)
self.max_nodes = max_nodes or float('inf')
+ self.max_price = max_price or float('inf')
self.logger = logging.getLogger('arvnodeman.jobqueue')
self.logged_jobs = set()
if job['uuid'] not in self.logged_jobs:
self.logged_jobs.add(job['uuid'])
self.logger.debug("job %s not satisfiable", job['uuid'])
- elif (want_count <= self.max_nodes):
+ elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
servers.extend([cloud_size.real] * max(1, want_count))
self.logged_jobs.intersection_update(seen_jobs)
return servers
if not cloud_size_list:
abort("No valid node sizes configured")
return ServerCalculator(cloud_size_list,
- config.getint('Daemon', 'max_nodes'))
+ config.getint('Daemon', 'max_nodes'),
+ config.getfloat('Daemon', 'max_total_price'))
def launch_pollers(config, server_calculator):
poll_time = config.getint('Daemon', 'poll_time')
cloud_node_updater, timer,
config.new_arvados_client, config.new_cloud_client,
config.shutdown_windows(),
- server_calculator.cheapest_size(),
+ server_calculator,
config.getint('Daemon', 'min_nodes'),
config.getint('Daemon', 'max_nodes'),
config.getint('Daemon', 'poll_stale_after'),
config.getint('Daemon', 'boot_fail_after'),
config.getint('Daemon', 'node_stale_after'),
- node_setup, node_shutdown, node_monitor).proxy()
+ node_setup, node_shutdown, node_monitor,
+ max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
signal.pause()
daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
# through SLURM before shutting them down.
#dispatcher = slurm
-# Node Manager will ensure that there are at least this many nodes
-# running at all times.
+# Node Manager will ensure that there are at least this many nodes running at
+# all times. If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type. However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
min_nodes = 0
# Node Manager will not start any compute nodes when at least this
# many are running.
max_nodes = 8
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
# Poll Azure nodes and Arvados for new information every N seconds.
poll_time = 60
# the API server to ping
ping_host = hostname:port
-[Size Standard_D3]
-# You can define any number of Size sections to list Azure sizes you're
-# willing to use. The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use. The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue. You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
# Each size section MUST define the number of cores are available in this
# size class (since libcloud does not provide any consistent API for exposing
# this setting).
# You may also want to define the amount of scratch space (expressed
# in GB) for Crunch jobs. You can also override Microsoft's provided
-# data fields by setting the same names here.
+# data fields by setting them here.
+
+[Size Standard_D3]
cores = 4
-scratch = 200
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12
# through SLURM before shutting them down.
#dispatcher = slurm
-# Node Manager will ensure that there are at least this many nodes
-# running at all times.
+# Node Manager will ensure that there are at least this many nodes running at
+# all times. If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type. However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
min_nodes = 0
# Node Manager will not start any compute nodes when at least this
# many are running.
max_nodes = 8
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
# Poll EC2 nodes and Arvados for new information every N seconds.
poll_time = 60
# compute node.
security_groups = idstring1, idstring2
-[Size t2.medium]
+
# You can define any number of Size sections to list EC2 sizes you're
# willing to use. The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
+#
# Each size section MUST define the number of cores are available in this
# size class (since libcloud does not provide any consistent API for exposing
# this setting).
# You may also want to define the amount of scratch space (expressed
# in GB) for Crunch jobs. You can also override Amazon's provided
-# data fields by setting the same names here.
+# data fields (such as price per hour) by setting them here.
+
+[Size m4.large]
cores = 2
+price = 0.126
+scratch = 100
+
+[Size m4.xlarge]
+cores = 4
+price = 0.252
scratch = 100
# All times are in seconds unless specified otherwise.
[Daemon]
-# Node Manager will ensure that there are at least this many nodes
-# running at all times.
+# Node Manager will ensure that there are at least this many nodes running at
+# all times. If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type. However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
min_nodes = 0
# Node Manager will not start any compute nodes when at least this
-# many are running.
+# running at all times. By default, these will be the cheapest node size.
max_nodes = 8
# Poll compute nodes and Arvados for new information every N seconds.
poll_time = 60
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
# Polls have exponential backoff when services fail to respond.
# This is the longest time to wait between polls.
max_poll_time = 300
# See http://libcloud.readthedocs.org/en/latest/compute/drivers/gce.html#specifying-service-account-scopes
# service_accounts = [{'email':'account@example.com', 'scopes':['storage-ro']}]
-[Size n1-standard-2]
+
# You can define any number of Size sections to list node sizes you're
# willing to use. The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
#
# The Size fields are interpreted the same way as with a libcloud NodeSize:
# http://libcloud.readthedocs.org/en/latest/compute/api.html#libcloud.compute.base.NodeSize
# this setting).
# You may also want to define the amount of scratch space (expressed
# in GB) for Crunch jobs.
+# You can also override Google's provided data fields (such as price per hour)
+# by setting them here.
+
+[Size n1-standard-2]
cores = 2
+price = 0.076
scratch = 100
-ram = 512
+
+[Size n1-standard-4]
+cores = 4
+price = 0.152
+scratch = 200
\ No newline at end of file
import pykka
import arvnodeman.daemon as nmdaemon
+from arvnodeman.jobqueue import ServerCalculator
from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
+import logging
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
- def new_setup_proxy(self):
+ def mock_node_start(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
# it gets a new mock object back.
- self.last_setup = mock.MagicMock(name='setup_proxy_mock')
- return self.last_setup
+ get_cloud_size = mock.MagicMock()
+ get_cloud_size.get.return_value = kwargs["cloud_size"]
+ mock_actor = mock.MagicMock()
+ mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
+ cloud_size=get_cloud_size,
+ actor_ref=mock_actor)
+ mock_actor.proxy.return_value = mock_proxy
+
+ self.last_setup = mock_proxy
+ return mock_actor
+
+ def mock_node_shutdown(self, **kwargs):
+ # Make sure that every time the daemon starts a shutdown actor,
+ # it gets a new mock object back.
+ get_cloud_node = mock.MagicMock()
+ if "node_monitor" in kwargs:
+ get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
+ mock_actor = mock.MagicMock()
+ mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
+ cloud_node=get_cloud_node,
+ actor_ref=mock_actor)
+
+ mock_actor.proxy.return_value = mock_proxy
+ self.last_shutdown = mock_proxy
+
+ return mock_actor
def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
- min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
+ avail_sizes=[(testutil.MockSize(1), {"cores": 1})],
+ min_nodes=0, max_nodes=8,
+ shutdown_windows=[54, 5, 1],
+ max_total_price=None):
for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
self.arv_factory = mock.MagicMock(name='arvados_mock')
self.cloud_factory().node_start_time.return_value = time.time()
self.cloud_updates = mock.MagicMock(name='updates_mock')
self.timer = testutil.MockTimer(deliver_immediately=False)
+
self.node_setup = mock.MagicMock(name='setup_mock')
- self.node_setup.start().proxy.side_effect = self.new_setup_proxy
+ self.node_setup.start.side_effect = self.mock_node_start
self.node_setup.reset_mock()
+
self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+ self.node_shutdown.start.side_effect = self.mock_node_shutdown
+
self.daemon = nmdaemon.NodeManagerDaemonActor.start(
self.server_wishlist_poller, self.arvados_nodes_poller,
self.cloud_nodes_poller, self.cloud_updates, self.timer,
self.arv_factory, self.cloud_factory,
- [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
- self.node_setup, self.node_shutdown).proxy()
+ shutdown_windows, ServerCalculator(avail_sizes),
+ min_nodes, max_nodes, 600, 1800, 3600,
+ self.node_setup, self.node_shutdown,
+ max_total_price=max_total_price).proxy()
if cloud_nodes is not None:
self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
if arvados_nodes is not None:
def test_old_arvados_node_not_double_assigned(self):
arv_node = testutil.arvados_node_mock(3, age=9000)
size = testutil.MockSize(3)
- self.make_daemon(arvados_nodes=[arv_node])
+ self.make_daemon(arvados_nodes=[arv_node], avail_sizes=[(size, {"cores":1})])
self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.daemon.shutdowns.get()[cloud_nodes[1].id] = True
+
+ get_cloud_node = mock.MagicMock(name="get_cloud_node")
+ get_cloud_node.get.return_value = cloud_nodes[1]
+ mock_node_monitor = mock.MagicMock()
+ mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
+ mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
+
+ self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
+
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
- self.assertEqual(0, self.node_shutdown.start.call_count)
+ self.assertEqual(1, self.node_shutdown.start.call_count)
def test_booting_nodes_counted(self):
cloud_node = testutil.cloud_node_mock(1)
self.assertEqual(1, self.node_setup.start.call_count)
def test_boot_new_node_when_all_nodes_busy(self):
+ size = testutil.MockSize(2)
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
- self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
- [testutil.MockSize(2)])
+ self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
+ [size], avail_sizes=[(size, {"cores":1})])
self.stop_proxy(self.daemon)
self.assertTrue(self.node_setup.start.called)
def test_boot_new_node_below_min_nodes(self):
min_size = testutil.MockSize(1)
wish_size = testutil.MockSize(3)
- self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
+ avail_sizes = [(min_size, {"cores": 1}),
+ (wish_size, {"cores": 3})]
+ self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
cloud_node = testutil.cloud_node_mock(id_num)
if arv_node is None:
arv_node = testutil.arvados_node_mock(id_num)
- self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
+ self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
+ avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
self.daemon.max_nodes.get(self.TIMEOUT)
self.assertEqual(1, self.node_setup.start.call_count)
self.last_setup.cloud_node.get.return_value = cloud_node
def test_all_booting_nodes_tried_to_shut_down(self):
size = testutil.MockSize(2)
- self.make_daemon(want_sizes=[size])
+ self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
self.daemon.max_nodes.get(self.TIMEOUT)
setup1 = self.last_setup
setup1.stop_if_no_cloud_node().get.return_value = False
self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
- shutdown_proxy = self.node_shutdown.start().proxy
- shutdown_proxy().cloud_node.get.return_value = cloud_node
- shutdown_proxy().success.get.return_value = False
- shutdown_proxy.reset_mock()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.assertTrue(shutdown_proxy.called)
- self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
- shutdown_proxy().success.get.return_value = True
- shutdown_proxy.reset_mock()
+ self.last_shutdown.success.get.return_value = False
+ self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
+ self.assertEqual(1, self.alive_monitor_count())
+
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
- self.assertTrue(shutdown_proxy.called)
+ self.last_shutdown.success.get.return_value = True
+ self.last_shutdown.stop.side_effect = lambda: monitor.stop()
+ self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
+ self.assertEqual(0, self.alive_monitor_count())
def test_broken_node_blackholed_after_cancelled_shutdown(self):
- cloud_node = testutil.cloud_node_mock(8)
- wishlist = [testutil.MockSize(8)]
+ size = testutil.MockSize(8)
+ cloud_node = testutil.cloud_node_mock(8, size=size)
+ wishlist = [size]
self.make_daemon([cloud_node], [testutil.arvados_node_mock(8)],
- wishlist)
+ wishlist, avail_sizes=[(size, {"cores":1})])
self.assertEqual(1, self.alive_monitor_count())
self.assertFalse(self.node_setup.start.called)
monitor = self.monitor_list()[0].proxy()
self.assertEqual(1, self.node_setup.start.call_count)
def test_nodes_shutting_down_replaced_below_max_nodes(self):
- cloud_node = testutil.cloud_node_mock(6)
- self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
+ size = testutil.MockSize(6)
+ cloud_node = testutil.cloud_node_mock(6, size=size)
+ self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)],
+ avail_sizes=[(size, {"cores":1})])
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.assertFalse(self.node_setup.start.called)
def test_nodes_shutting_down_count_against_excess(self):
- cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
- arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
- self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
+ size = testutil.MockSize(8)
+ cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
+ arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
+ self.make_daemon(cloud_nodes, arv_nodes, [size],
+ avail_sizes=[(size, {"cores":1})])
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.assertEqual(
- 1, self.node_shutdown.start().proxy().stop().get.call_count)
+ 1, self.last_shutdown.stop.call_count)
def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
# We're mainly testing that update_cloud_nodes catches and handles
# the ActorDeadError.
- stop_method = self.node_shutdown.start().proxy().stop().get
- stop_method.side_effect = pykka.ActorDeadError
+ self.last_shutdown.stop.side_effect = pykka.ActorDeadError
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertEqual(1, stop_method.call_count)
+ self.assertEqual(1, self.last_shutdown.stop.call_count)
+
+ def busywait(self, f):
+ n = 0
+ while not f() and n < 10:
+ time.sleep(.1)
+ n += 1
+ self.assertTrue(f())
+
+ def test_node_create_two_sizes(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+ (testutil.MockSize(2), {"cores":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes, max_nodes=4)
+
+ # the daemon runs in another thread, so we need to wait and see
+ # if it does all the work we're expecting it to do before stopping it.
+ self.busywait(lambda: self.node_setup.start.call_count == 4)
+ booting = self.daemon.booting.get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ logging.info(sizecounts)
+ self.assertEqual(3, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
+
+ def test_node_max_nodes_two_sizes(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+ (testutil.MockSize(2), {"cores":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes, max_nodes=3)
+
+ # the daemon runs in another thread, so we need to wait and see
+ # if it does all the work we're expecting it to do before stopping it.
+ self.busywait(lambda: self.node_setup.start.call_count == 3)
+ booting = self.daemon.booting.get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ self.assertEqual(2, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
+
+ def test_wishlist_reconfigure(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
+
+ self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
+ testutil.cloud_node_mock(2, small),
+ testutil.cloud_node_mock(3, big)],
+ arvados_nodes=[testutil.arvados_node_mock(1),
+ testutil.arvados_node_mock(2),
+ testutil.arvados_node_mock(3)],
+ want_sizes=[small, small, big],
+ avail_sizes=avail_sizes)
+
+ self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
+
+ self.assertEqual(0, self.node_shutdown.start.call_count)
+
+ for c in self.daemon.cloud_nodes.get().nodes.itervalues():
+ self.daemon.node_can_shutdown(c.actor)
+
+ booting = self.daemon.booting.get()
+ shutdowns = self.daemon.shutdowns.get()
+
+ self.stop_proxy(self.daemon)
+
+ self.assertEqual(1, self.node_setup.start.call_count)
+ self.assertEqual(1, self.node_shutdown.start.call_count)
+
+ # booting a new big node
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ self.assertEqual(0, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
+
+ # shutting down a small node
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in shutdowns.itervalues():
+ sizecounts[b.cloud_node.get().size.id] += 1
+ self.assertEqual(1, sizecounts[small.id])
+ self.assertEqual(0, sizecounts[big.id])
+
+ def test_node_max_price(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
+ (testutil.MockSize(2), {"cores":2, "price":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes,
+ max_nodes=4,
+ max_total_price=4)
+ # the daemon runs in another thread, so we need to wait and see
+ # if it does all the work we're expecting it to do before stopping it.
+ self.busywait(lambda: self.node_setup.start.call_count == 3)
+ booting = self.daemon.booting.get()
+ self.stop_proxy(self.daemon)
+
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ logging.info(sizecounts)
+
+ # Booting 3 small nodes and not booting a big node would also partially
+ # satisfy the wishlist and come in under the price cap, however the way
+ # the update_server_wishlist() currently works effectively results in a
+ # round-robin creation of one node of each size in the wishlist, so
+ # test for that.
+ self.assertEqual(2, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
{'min_scratch_mb_per_node': 200})
self.assertEqual(6, len(servlist))
+ def test_ignore_too_expensive_jobs(self):
+ servcalc = self.make_calculator([1, 2], max_nodes=12, max_price=6)
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 1, 'min_nodes': 6})
+ self.assertEqual(6, len(servlist))
+
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 2, 'min_nodes': 6})
+ self.assertEqual(0, len(servlist))
+
def test_job_requesting_max_nodes_accepted(self):
servcalc = self.make_calculator([1], max_nodes=4)
servlist = self.calculate(servcalc, {'min_nodes': 4})
servcalc = self.make_calculator([2, 4, 1, 3])
self.assertEqual(testutil.MockSize(1), servcalc.cheapest_size())
+ def test_next_biggest(self):
+ servcalc = self.make_calculator([1, 2, 4, 8])
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 3},
+ {'min_cores_per_node': 6})
+ self.assertEqual([servcalc.cloud_sizes[2].id,
+ servcalc.cloud_sizes[3].id],
+ [s.id for s in servlist])
+
+ def test_multiple_sizes(self):
+ servcalc = self.make_calculator([1, 2])
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1})
+ self.assertEqual([servcalc.cloud_sizes[1].id,
+ servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[0].id],
+ [s.id for s in servlist])
+
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1})
+ self.assertEqual([servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[1].id,
+ servcalc.cloud_sizes[0].id],
+ [s.id for s in servlist])
+
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2})
+ self.assertEqual([servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[1].id],
+ [s.id for s in servlist])
+
+
class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
unittest.TestCase):
if __name__ == '__main__':
unittest.main()
-
cloud_object.extra = extra
return cloud_object
-def cloud_node_mock(node_num=99, **extra):
- node = mock.NonCallableMagicMock(
- ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
- 'image', 'extra'],
- name='cloud_node')
- node.id = str(node_num)
- node.name = node.id
- node.public_ips = []
- node.private_ips = [ip_address_mock(node_num)]
- node.extra = extra
- return node
def cloud_node_fqdn(node):
# We intentionally put the FQDN somewhere goofy to make sure tested code is
self.subscriber = mock.Mock(name='subscriber_mock')
self.monitor = self.TEST_CLASS.start(
self.client, self.timer, *args, **kwargs).proxy()
+
+def cloud_node_mock(node_num=99, size=MockSize(1), **extra):
+ node = mock.NonCallableMagicMock(
+ ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
+ 'image', 'extra'],
+ name='cloud_node')
+ node.id = str(node_num)
+ node.name = node.id
+ node.size = size
+ node.public_ips = []
+ node.private_ips = [ip_address_mock(node_num)]
+ node.extra = extra
+ return node