self._pair_nodes(cloud_rec, arv_node)
break
- def _nodes_up(self):
- return sum(len(nodelist) for nodelist in
- [self.cloud_nodes, self.booted, self.booting])
-
- def _nodes_busy(self):
+ def _nodes_up(self, size):
+ up = 0
+ up += sum(1
+ for c in self.booting.itervalues()
+ if size is None or c.cloud_node.get().size.id == size.id)
+ up += sum(1
+ for i in (self.booted, self.cloud_nodes.nodes)
+ for c in i.itervalues()
+ if size is None or c.cloud_node.size.id == size.id)
+ return up
+
+ def _nodes_busy(self, size):
return sum(1 for busy in
pykka.get_all(rec.actor.in_state('busy') for rec in
- self.cloud_nodes.nodes.itervalues())
+ self.cloud_nodes.nodes.itervalues()
+ if rec.cloud_node.size.id == size.id)
if busy)
- def _nodes_missing(self):
+ def _nodes_missing(self, size):
return sum(1 for arv_node in
pykka.get_all(rec.actor.arvados_node for rec in
self.cloud_nodes.nodes.itervalues()
- if rec.actor.cloud_node.get().id not in self.shutdowns)
+ if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
- def _nodes_wanted(self):
- up_count = self._nodes_up()
- under_min = self.min_nodes - up_count
- over_max = up_count - self.max_nodes
+ def _size_wishlist(self, size):
+ return sum(1 for c in self.last_wishlist if c.id == size.id)
+
+ def _size_shutdowns(self, size):
+ return sum(1 for c in self.shutdowns.itervalues()
+ if c.cloud_node.get().size.id == size.id)
+
+ def _nodes_wanted(self, size):
+ total_up_count = self._nodes_up(None)
+ under_min = self.min_nodes - total_up_count
+ over_max = total_up_count - self.max_nodes
if over_max >= 0:
return -over_max
- elif under_min > 0:
+ elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
else:
- up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
- return len(self.last_wishlist) - up_count
-
- def _nodes_excess(self):
- up_count = self._nodes_up() - len(self.shutdowns)
- over_min = up_count - self.min_nodes
- if over_min <= 0:
- return over_min
- else:
- return up_count - self._nodes_busy() - len(self.last_wishlist)
+ up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
+ self._nodes_busy(size) +
+ self._nodes_missing(size))
+ #self._logger.info("_nodes_up for %s is %s", size.id, self._nodes_up(size))
+ #self._logger.info("counts %s %s %s", len(self.booting), len(self.booted), len(self.cloud_nodes))
+ return self._size_wishlist(size) - up_count
+
+ def _nodes_excess(self, size):
+ up_count = self._nodes_up(size) - self._size_shutdowns(size)
+ if size.id == self.min_cloud_size.id:
+ up_count -= self.min_nodes
+ return up_count - self._nodes_busy(size) - self._size_wishlist(size)
def update_server_wishlist(self, wishlist):
self._update_poll_time('server_wishlist')
self.last_wishlist = wishlist
- nodes_wanted = self._nodes_wanted()
- if nodes_wanted > 0:
- self._later.start_node()
- elif (nodes_wanted < 0) and self.booting:
- self._later.stop_booting_node()
+ for sz in reversed(self.server_calculator.cloud_sizes):
+ size = sz.real
+ nodes_wanted = self._nodes_wanted(size)
+ if nodes_wanted > 0:
+ self._later.start_node(size)
+ elif (nodes_wanted < 0) and self.booting:
+ self._later.stop_booting_node(size)
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
return wrapper
@_check_poll_freshness
- def start_node(self):
- nodes_wanted = self._nodes_wanted()
+ def start_node(self, cloud_size):
+ nodes_wanted = self._nodes_wanted(cloud_size)
if nodes_wanted < 1:
return None
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
- try:
- cloud_size = self.last_wishlist[self._nodes_up()]
- except IndexError:
- cloud_size = self.min_cloud_size
self._logger.info("Want %s more nodes. Booting a %s node.",
nodes_wanted, cloud_size.name)
new_setup = self._node_setup.start(
time.time())
new_setup.subscribe(self._later.node_up)
if nodes_wanted > 1:
- self._later.start_node()
+ self._later.start_node(cloud_size)
def _get_actor_attrs(self, actor, *attr_names):
return pykka.get_all([getattr(actor, name) for name in attr_names])
self._later.shutdown_unpaired_node, cloud_node.id)
@_check_poll_freshness
- def stop_booting_node(self):
- nodes_excess = self._nodes_excess()
+ def stop_booting_node(self, size):
+ nodes_excess = self._nodes_excess(size)
if (nodes_excess < 1) or not self.booting:
return None
for key, node in self.booting.iteritems():
- if node.stop_if_no_cloud_node().get():
+ if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
if nodes_excess > 1:
- self._later.stop_booting_node()
+ self._later.stop_booting_node(size)
break
def _begin_node_shutdown(self, node_actor, cancellable):
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
- if self._nodes_excess() > 0:
+ if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
self._begin_node_shutdown(node_actor, cancellable=True)
def shutdown_unpaired_node(self, cloud_node_id):
from arvnodeman.jobqueue import ServerCalculator
from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
+import logging
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
- def new_setup_proxy(self):
+ def new_setup(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
# it gets a new mock object back.
- self.last_setup = mock.MagicMock(name='setup_proxy_mock')
+ get_cloud_node = mock.MagicMock()
+ get_cloud_node.get.return_value = mock.NonCallableMock(size=kwargs["cloud_size"])
+
+ self.last_setup = mock.NonCallableMock(name='setup_mock',
+ cloud_node=get_cloud_node)
+ self.last_setup.proxy = mock.MagicMock(return_value = self.last_setup)
+
return self.last_setup
def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
self.cloud_factory().node_start_time.return_value = time.time()
self.cloud_updates = mock.MagicMock(name='updates_mock')
self.timer = testutil.MockTimer(deliver_immediately=False)
+
self.node_setup = mock.MagicMock(name='setup_mock')
- self.node_setup.start().proxy.side_effect = self.new_setup_proxy
+ self.node_setup.start.side_effect = self.new_setup
self.node_setup.reset_mock()
self.node_shutdown = mock.MagicMock(name='shutdown_mock')
self.daemon = nmdaemon.NodeManagerDaemonActor.start(
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.daemon.shutdowns.get()[cloud_nodes[1].id] = True
+ self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock.MagicMock(name='shutdown_proxy_mock')
self.assertEqual(2, self.alive_monitor_count())
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
def test_boot_new_node_below_min_nodes(self):
min_size = testutil.MockSize(1)
wish_size = testutil.MockSize(3)
- self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
+ avail_sizes = [(min_size, {"cores": 1}),
+ (wish_size, {"cores": 3})]
+ self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
cloud_object.extra = extra
return cloud_object
-def cloud_node_mock(node_num=99, **extra):
- node = mock.NonCallableMagicMock(
- ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
- 'image', 'extra'],
- name='cloud_node')
- node.id = str(node_num)
- node.name = node.id
- node.public_ips = []
- node.private_ips = [ip_address_mock(node_num)]
- node.extra = extra
- return node
def cloud_node_fqdn(node):
# We intentionally put the FQDN somewhere goofy to make sure tested code is
self.subscriber = mock.Mock(name='subscriber_mock')
self.monitor = self.TEST_CLASS.start(
self.client, self.timer, *args, **kwargs).proxy()
+
+def cloud_node_mock(node_num=99, size=MockSize(1), **extra):
+ node = mock.NonCallableMagicMock(
+ ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
+ 'image', 'extra'],
+ name='cloud_node')
+ node.id = str(node_num)
+ node.name = node.id
+ node.size = size
+ node.public_ips = []
+ node.private_ips = [ip_address_mock(node_num)]
+ node.extra = extra
+ return node