Merge branch '5353-node-sizes' closes #5353
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 18 Nov 2015 17:09:47 +0000 (12:09 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 18 Nov 2015 17:09:47 +0000 (12:09 -0500)
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/doc/azure.example.cfg
services/nodemanager/doc/ec2.example.cfg
services/nodemanager/doc/gce.example.cfg
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/testutil.py

index 315df1c3f984e29a0edfc09c71f76051def7480d..af9c8c0ced908c75d635fea17efeb077de38af63 100644 (file)
@@ -41,6 +41,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'poll_time': '60',
                        'max_poll_time': '300',
                        'poll_stale_after': '600',
+                       'max_total_price': '0',
                        'boot_fail_after': str(sys.maxint),
                        'node_stale_after': str(60 * 60 * 2)},
             'Logging': {'file': '/dev/stderr',
index a65e9a0705d1cd5941140a20dbd4c6d4e1f5fd57..64bb177de56f84e75e2b01dca644db7f35d9433f 100644 (file)
@@ -103,13 +103,15 @@ class NodeManagerDaemonActor(actor_class):
     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
                  cloud_nodes_actor, cloud_update_actor, timer_actor,
                  arvados_factory, cloud_factory,
-                 shutdown_windows, min_size, min_nodes, max_nodes,
+                 shutdown_windows, server_calculator,
+                 min_nodes, max_nodes,
                  poll_stale_after=600,
                  boot_fail_after=1800,
                  node_stale_after=7200,
                  node_setup_class=dispatch.ComputeNodeSetupActor,
                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
-                 node_actor_class=dispatch.ComputeNodeMonitorActor):
+                 node_actor_class=dispatch.ComputeNodeMonitorActor,
+                 max_total_price=0):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
@@ -122,9 +124,11 @@ class NodeManagerDaemonActor(actor_class):
         self._logger = logging.getLogger('arvnodeman.daemon')
         self._later = self.actor_ref.proxy()
         self.shutdown_windows = shutdown_windows
-        self.min_cloud_size = min_size
+        self.server_calculator = server_calculator
+        self.min_cloud_size = self.server_calculator.cheapest_size()
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
+        self.max_total_price = max_total_price
         self.poll_stale_after = poll_stale_after
         self.boot_fail_after = boot_fail_after
         self.node_stale_after = node_stale_after
@@ -210,51 +214,89 @@ class NodeManagerDaemonActor(actor_class):
                     self._pair_nodes(cloud_rec, arv_node)
                     break
 
-    def _nodes_up(self):
-        return sum(len(nodelist) for nodelist in
-                   [self.cloud_nodes, self.booted, self.booting])
-
-    def _nodes_busy(self):
+    def _nodes_up(self, size):
+        up = 0
+        up += sum(1
+                  for c in self.booting.itervalues()
+                  if size is None or c.cloud_size.get().id == size.id)
+        up += sum(1
+                  for i in (self.booted, self.cloud_nodes.nodes)
+                  for c in i.itervalues()
+                  if size is None or (c.cloud_node.size and c.cloud_node.size.id == size.id))
+        return up
+
+    def _total_price(self):
+        cost = 0
+        cost += sum(c.cloud_size.get().price
+                  for c in self.booting.itervalues())
+        cost += sum(c.cloud_node.size.price
+                    for i in (self.booted, self.cloud_nodes.nodes)
+                    for c in i.itervalues()
+                    if c.cloud_node.size)
+        return cost
+
+    def _nodes_busy(self, size):
         return sum(1 for busy in
                    pykka.get_all(rec.actor.in_state('busy') for rec in
-                                 self.cloud_nodes.nodes.itervalues())
+                                 self.cloud_nodes.nodes.itervalues()
+                                 if (rec.cloud_node.size and rec.cloud_node.size.id == size.id))
                    if busy)
 
-    def _nodes_missing(self):
+    def _nodes_missing(self, size):
         return sum(1 for arv_node in
                    pykka.get_all(rec.actor.arvados_node for rec in
                                  self.cloud_nodes.nodes.itervalues()
-                                 if rec.actor.cloud_node.get().id not in self.shutdowns)
+                                 if rec.cloud_node.size and rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
 
-    def _nodes_wanted(self):
-        up_count = self._nodes_up()
-        under_min = self.min_nodes - up_count
-        over_max = up_count - self.max_nodes
+    def _size_wishlist(self, size):
+        return sum(1 for c in self.last_wishlist if c.id == size.id)
+
+    def _size_shutdowns(self, size):
+        return sum(1 for c in self.shutdowns.itervalues()
+                   if c.cloud_node.get().size.id == size.id)
+
+    def _nodes_wanted(self, size):
+        total_up_count = self._nodes_up(None)
+        under_min = self.min_nodes - total_up_count
+        over_max = total_up_count - self.max_nodes
+        total_price = self._total_price()
+
         if over_max >= 0:
             return -over_max
-        elif under_min > 0:
+        elif under_min > 0 and size.id == self.min_cloud_size.id:
             return under_min
+
+        up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
+                                           self._nodes_busy(size) +
+                                           self._nodes_missing(size))
+
+        wanted = self._size_wishlist(size) - up_count
+        if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
+            can_boot = int((self.max_total_price - total_price) / size.price)
+            if can_boot == 0:
+                self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
+                                  size.name, size.price, self.max_total_price, total_price)
+            return can_boot
         else:
-            up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
-            return len(self.last_wishlist) - up_count
-
-    def _nodes_excess(self):
-        up_count = self._nodes_up() - len(self.shutdowns)
-        over_min = up_count - self.min_nodes
-        if over_min <= 0:
-            return over_min
-        else:
-            return up_count - self._nodes_busy() - len(self.last_wishlist)
+            return wanted
+
+    def _nodes_excess(self, size):
+        up_count = self._nodes_up(size) - self._size_shutdowns(size)
+        if size.id == self.min_cloud_size.id:
+            up_count -= self.min_nodes
+        return up_count - self._nodes_busy(size) - self._size_wishlist(size)
 
     def update_server_wishlist(self, wishlist):
         self._update_poll_time('server_wishlist')
         self.last_wishlist = wishlist
-        nodes_wanted = self._nodes_wanted()
-        if nodes_wanted > 0:
-            self._later.start_node()
-        elif (nodes_wanted < 0) and self.booting:
-            self._later.stop_booting_node()
+        for sz in reversed(self.server_calculator.cloud_sizes):
+            size = sz.real
+            nodes_wanted = self._nodes_wanted(size)
+            if nodes_wanted > 0:
+                self._later.start_node(size)
+            elif (nodes_wanted < 0) and self.booting:
+                self._later.stop_booting_node(size)
 
     def _check_poll_freshness(orig_func):
         """Decorator to inhibit a method when poll information is stale.
@@ -274,15 +316,11 @@ class NodeManagerDaemonActor(actor_class):
         return wrapper
 
     @_check_poll_freshness
-    def start_node(self):
-        nodes_wanted = self._nodes_wanted()
+    def start_node(self, cloud_size):
+        nodes_wanted = self._nodes_wanted(cloud_size)
         if nodes_wanted < 1:
             return None
         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
-        try:
-            cloud_size = self.last_wishlist[self._nodes_up()]
-        except IndexError:
-            cloud_size = self.min_cloud_size
         self._logger.info("Want %s more nodes.  Booting a %s node.",
                           nodes_wanted, cloud_size.name)
         new_setup = self._node_setup.start(
@@ -297,7 +335,7 @@ class NodeManagerDaemonActor(actor_class):
                 time.time())
         new_setup.subscribe(self._later.node_up)
         if nodes_wanted > 1:
-            self._later.start_node()
+            self._later.start_node(cloud_size)
 
     def _get_actor_attrs(self, actor, *attr_names):
         return pykka.get_all([getattr(actor, name) for name in attr_names])
@@ -314,15 +352,15 @@ class NodeManagerDaemonActor(actor_class):
                              self._later.shutdown_unpaired_node, cloud_node.id)
 
     @_check_poll_freshness
-    def stop_booting_node(self):
-        nodes_excess = self._nodes_excess()
+    def stop_booting_node(self, size):
+        nodes_excess = self._nodes_excess(size)
         if (nodes_excess < 1) or not self.booting:
             return None
         for key, node in self.booting.iteritems():
-            if node.stop_if_no_cloud_node().get():
+            if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
                 del self.booting[key]
                 if nodes_excess > 1:
-                    self._later.stop_booting_node()
+                    self._later.stop_booting_node(size)
                 break
 
     def _begin_node_shutdown(self, node_actor, cancellable):
@@ -338,7 +376,7 @@ class NodeManagerDaemonActor(actor_class):
 
     @_check_poll_freshness
     def node_can_shutdown(self, node_actor):
-        if self._nodes_excess() > 0:
+        if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
             self._begin_node_shutdown(node_actor, cancellable=True)
 
     def shutdown_unpaired_node(self, cloud_node_id):
index 06f66b71c244f5662fb0c9871f7c92b0603f1617..8f78ba1df56715a1b0e394a03f1c7da553703892 100644 (file)
@@ -38,11 +38,12 @@ class ServerCalculator(object):
             return True
 
 
-    def __init__(self, server_list, max_nodes=None):
+    def __init__(self, server_list, max_nodes=None, max_price=None):
         self.cloud_sizes = [self.CloudSizeWrapper(s, **kws)
                             for s, kws in server_list]
         self.cloud_sizes.sort(key=lambda s: s.price)
         self.max_nodes = max_nodes or float('inf')
+        self.max_price = max_price or float('inf')
         self.logger = logging.getLogger('arvnodeman.jobqueue')
         self.logged_jobs = set()
 
@@ -75,7 +76,7 @@ class ServerCalculator(object):
                 if job['uuid'] not in self.logged_jobs:
                     self.logged_jobs.add(job['uuid'])
                     self.logger.debug("job %s not satisfiable", job['uuid'])
-            elif (want_count <= self.max_nodes):
+            elif (want_count <= self.max_nodes) and (want_count*cloud_size.price <= self.max_price):
                 servers.extend([cloud_size.real] * max(1, want_count))
         self.logged_jobs.intersection_update(seen_jobs)
         return servers
index 880158234da668ca212604252a070b8db30cddbd..e8c2fe661e5203469a1ee87341159aeb6cdc1aec 100644 (file)
@@ -62,7 +62,8 @@ def build_server_calculator(config):
     if not cloud_size_list:
         abort("No valid node sizes configured")
     return ServerCalculator(cloud_size_list,
-                            config.getint('Daemon', 'max_nodes'))
+                            config.getint('Daemon', 'max_nodes'),
+                            config.getfloat('Daemon', 'max_total_price'))
 
 def launch_pollers(config, server_calculator):
     poll_time = config.getint('Daemon', 'poll_time')
@@ -114,13 +115,14 @@ def main(args=None):
         cloud_node_updater, timer,
         config.new_arvados_client, config.new_cloud_client,
         config.shutdown_windows(),
-        server_calculator.cheapest_size(),
+        server_calculator,
         config.getint('Daemon', 'min_nodes'),
         config.getint('Daemon', 'max_nodes'),
         config.getint('Daemon', 'poll_stale_after'),
         config.getint('Daemon', 'boot_fail_after'),
         config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor).proxy()
+        node_setup, node_shutdown, node_monitor,
+        max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
 
     signal.pause()
     daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
index 50161c288c723a7a9a2753622a8f2481fd6484f7..0b5f1d360e8887a48637b68a94908b98160bc932 100644 (file)
@@ -7,14 +7,22 @@
 # through SLURM before shutting them down.
 #dispatcher = slurm
 
-# Node Manager will ensure that there are at least this many nodes
-# running at all times.
+# Node Manager will ensure that there are at least this many nodes running at
+# all times.  If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type.  However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
 min_nodes = 0
 
 # Node Manager will not start any compute nodes when at least this
 # many are running.
 max_nodes = 8
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # Poll Azure nodes and Arvados for new information every N seconds.
 poll_time = 60
 
@@ -138,16 +146,25 @@ tag_cluster = zyxwv
 # the API server to ping
 ping_host = hostname:port
 
-[Size Standard_D3]
-# You can define any number of Size sections to list Azure sizes you're
-# willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use.  The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue.  You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
 # Each size section MUST define the number of cores are available in this
 # size class (since libcloud does not provide any consistent API for exposing
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.  You can also override Microsoft's provided
-# data fields by setting the same names here.
+# data fields by setting them here.
+
+[Size Standard_D3]
 cores = 4
-scratch = 200
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12
index 9b41ca14d54fc52956650649170574e90d99ae78..1c33bfd4578b62e403a057de3787c44caa198be4 100644 (file)
@@ -7,14 +7,22 @@
 # through SLURM before shutting them down.
 #dispatcher = slurm
 
-# Node Manager will ensure that there are at least this many nodes
-# running at all times.
+# Node Manager will ensure that there are at least this many nodes running at
+# all times.  If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type.  However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
 min_nodes = 0
 
 # Node Manager will not start any compute nodes when at least this
 # many are running.
 max_nodes = 8
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # Poll EC2 nodes and Arvados for new information every N seconds.
 poll_time = 60
 
@@ -123,16 +131,24 @@ subnet_id = idstring
 # compute node.
 security_groups = idstring1, idstring2
 
-[Size t2.medium]
+
 # You can define any number of Size sections to list EC2 sizes you're
 # willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
+#
 # Each size section MUST define the number of cores are available in this
 # size class (since libcloud does not provide any consistent API for exposing
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.  You can also override Amazon's provided
-# data fields by setting the same names here.
+# data fields (such as price per hour) by setting them here.
+
+[Size m4.large]
 cores = 2
+price = 0.126
+scratch = 100
+
+[Size m4.xlarge]
+cores = 4
+price = 0.252
 scratch = 100
index 67703703ba51b8ced8eac18b0ab9f72d65b16b08..06fcdff1552de44f95f4ea263b0e6d63db4d3409 100644 (file)
@@ -2,17 +2,25 @@
 # All times are in seconds unless specified otherwise.
 
 [Daemon]
-# Node Manager will ensure that there are at least this many nodes
-# running at all times.
+# Node Manager will ensure that there are at least this many nodes running at
+# all times.  If node manager needs to start new idle nodes for the purpose of
+# satisfying min_nodes, it will use the cheapest node type.  However, depending
+# on usage patterns, it may also satisfy min_nodes by keeping alive some
+# more-expensive nodes
 min_nodes = 0
 
 # Node Manager will not start any compute nodes when at least this
-# many are running.
+# running at all times.  By default, these will be the cheapest node size.
 max_nodes = 8
 
 # Poll compute nodes and Arvados for new information every N seconds.
 poll_time = 60
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # Polls have exponential backoff when services fail to respond.
 # This is the longest time to wait between polls.
 max_poll_time = 300
@@ -118,11 +126,10 @@ image = debian-7
 # See http://libcloud.readthedocs.org/en/latest/compute/drivers/gce.html#specifying-service-account-scopes
 # service_accounts = [{'email':'account@example.com', 'scopes':['storage-ro']}]
 
-[Size n1-standard-2]
+
 # You can define any number of Size sections to list node sizes you're
 # willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
 #
 # The Size fields are interpreted the same way as with a libcloud NodeSize:
 # http://libcloud.readthedocs.org/en/latest/compute/api.html#libcloud.compute.base.NodeSize
@@ -135,6 +142,15 @@ image = debian-7
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.
+# You can also override Google's provided data fields (such as price per hour)
+# by setting them here.
+
+[Size n1-standard-2]
 cores = 2
+price = 0.076
 scratch = 100
-ram = 512
+
+[Size n1-standard-4]
+cores = 4
+price = 0.152
+scratch = 200
\ No newline at end of file
index bbfbe4b7452504ad9935729b1654821bddbd3903..200919bfb819183dfa8b50b426c7d33d60d64db9 100644 (file)
@@ -9,19 +9,48 @@ import mock
 import pykka
 
 import arvnodeman.daemon as nmdaemon
+from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
+import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
-    def new_setup_proxy(self):
+    def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
         # it gets a new mock object back.
-        self.last_setup = mock.MagicMock(name='setup_proxy_mock')
-        return self.last_setup
+        get_cloud_size = mock.MagicMock()
+        get_cloud_size.get.return_value = kwargs["cloud_size"]
+        mock_actor = mock.MagicMock()
+        mock_proxy = mock.NonCallableMock(name='setup_mock_proxy',
+                                          cloud_size=get_cloud_size,
+                                          actor_ref=mock_actor)
+        mock_actor.proxy.return_value = mock_proxy
+
+        self.last_setup = mock_proxy
+        return mock_actor
+
+    def mock_node_shutdown(self, **kwargs):
+        # Make sure that every time the daemon starts a shutdown actor,
+        # it gets a new mock object back.
+        get_cloud_node = mock.MagicMock()
+        if "node_monitor" in kwargs:
+            get_cloud_node.get.return_value = kwargs["node_monitor"].proxy().cloud_node.get()
+        mock_actor = mock.MagicMock()
+        mock_proxy = mock.NonCallableMock(name='shutdown_mock_proxy',
+                                          cloud_node=get_cloud_node,
+                                          actor_ref=mock_actor)
+
+        mock_actor.proxy.return_value = mock_proxy
+        self.last_shutdown = mock_proxy
+
+        return mock_actor
 
     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
-                    min_size=testutil.MockSize(1), min_nodes=0, max_nodes=8):
+                    avail_sizes=[(testutil.MockSize(1), {"cores": 1})],
+                    min_nodes=0, max_nodes=8,
+                    shutdown_windows=[54, 5, 1],
+                    max_total_price=None):
         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
         self.arv_factory = mock.MagicMock(name='arvados_mock')
@@ -29,16 +58,22 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.cloud_factory().node_start_time.return_value = time.time()
         self.cloud_updates = mock.MagicMock(name='updates_mock')
         self.timer = testutil.MockTimer(deliver_immediately=False)
+
         self.node_setup = mock.MagicMock(name='setup_mock')
-        self.node_setup.start().proxy.side_effect = self.new_setup_proxy
+        self.node_setup.start.side_effect = self.mock_node_start
         self.node_setup.reset_mock()
+
         self.node_shutdown = mock.MagicMock(name='shutdown_mock')
+        self.node_shutdown.start.side_effect = self.mock_node_shutdown
+
         self.daemon = nmdaemon.NodeManagerDaemonActor.start(
             self.server_wishlist_poller, self.arvados_nodes_poller,
             self.cloud_nodes_poller, self.cloud_updates, self.timer,
             self.arv_factory, self.cloud_factory,
-            [54, 5, 1], min_size, min_nodes, max_nodes, 600, 1800, 3600,
-            self.node_setup, self.node_shutdown).proxy()
+            shutdown_windows, ServerCalculator(avail_sizes),
+            min_nodes, max_nodes, 600, 1800, 3600,
+            self.node_setup, self.node_shutdown,
+            max_total_price=max_total_price).proxy()
         if cloud_nodes is not None:
             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
         if arvados_nodes is not None:
@@ -109,7 +144,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def test_old_arvados_node_not_double_assigned(self):
         arv_node = testutil.arvados_node_mock(3, age=9000)
         size = testutil.MockSize(3)
-        self.make_daemon(arvados_nodes=[arv_node])
+        self.make_daemon(arvados_nodes=[arv_node], avail_sizes=[(size, {"cores":1})])
         self.daemon.update_server_wishlist([size]).get(self.TIMEOUT)
         self.daemon.update_server_wishlist([size, size]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -165,11 +200,19 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.daemon.shutdowns.get()[cloud_nodes[1].id] = True
+
+        get_cloud_node = mock.MagicMock(name="get_cloud_node")
+        get_cloud_node.get.return_value = cloud_nodes[1]
+        mock_node_monitor = mock.MagicMock()
+        mock_node_monitor.proxy.return_value = mock.NonCallableMock(cloud_node=get_cloud_node)
+        mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
+
+        self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
+
         self.assertEqual(2, self.alive_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
-        self.assertEqual(0, self.node_shutdown.start.call_count)
+        self.assertEqual(1, self.node_shutdown.start.call_count)
 
     def test_booting_nodes_counted(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -183,16 +226,19 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(1, self.node_setup.start.call_count)
 
     def test_boot_new_node_when_all_nodes_busy(self):
+        size = testutil.MockSize(2)
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
-        self.make_daemon([testutil.cloud_node_mock(2)], [arv_node],
-                         [testutil.MockSize(2)])
+        self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
+                         [size], avail_sizes=[(size, {"cores":1})])
         self.stop_proxy(self.daemon)
         self.assertTrue(self.node_setup.start.called)
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
         wish_size = testutil.MockSize(3)
-        self.make_daemon([], [], None, min_size=min_size, min_nodes=2)
+        avail_sizes = [(min_size, {"cores": 1}),
+                       (wish_size, {"cores": 3})]
+        self.make_daemon([], [], None, avail_sizes=avail_sizes, min_nodes=2)
         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.daemon.update_server_wishlist([wish_size]).get(self.TIMEOUT)
@@ -222,7 +268,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             cloud_node = testutil.cloud_node_mock(id_num)
         if arv_node is None:
             arv_node = testutil.arvados_node_mock(id_num)
-        self.make_daemon(want_sizes=[testutil.MockSize(id_num)])
+        self.make_daemon(want_sizes=[testutil.MockSize(id_num)],
+                         avail_sizes=[(testutil.MockSize(id_num), {"cores":1})])
         self.daemon.max_nodes.get(self.TIMEOUT)
         self.assertEqual(1, self.node_setup.start.call_count)
         self.last_setup.cloud_node.get.return_value = cloud_node
@@ -373,7 +420,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
     def test_all_booting_nodes_tried_to_shut_down(self):
         size = testutil.MockSize(2)
-        self.make_daemon(want_sizes=[size])
+        self.make_daemon(want_sizes=[size], avail_sizes=[(size, {"cores":1})])
         self.daemon.max_nodes.get(self.TIMEOUT)
         setup1 = self.last_setup
         setup1.stop_if_no_cloud_node().get.return_value = False
@@ -437,23 +484,23 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(5)])
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
-        shutdown_proxy = self.node_shutdown.start().proxy
-        shutdown_proxy().cloud_node.get.return_value = cloud_node
-        shutdown_proxy().success.get.return_value = False
-        shutdown_proxy.reset_mock()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.assertTrue(shutdown_proxy.called)
-        self.daemon.node_finished_shutdown(shutdown_proxy()).get(self.TIMEOUT)
-        shutdown_proxy().success.get.return_value = True
-        shutdown_proxy.reset_mock()
+        self.last_shutdown.success.get.return_value = False
+        self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
+        self.assertEqual(1, self.alive_monitor_count())
+
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
-        self.assertTrue(shutdown_proxy.called)
+        self.last_shutdown.success.get.return_value = True
+        self.last_shutdown.stop.side_effect = lambda: monitor.stop()
+        self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
+        self.assertEqual(0, self.alive_monitor_count())
 
     def test_broken_node_blackholed_after_cancelled_shutdown(self):
-        cloud_node = testutil.cloud_node_mock(8)
-        wishlist = [testutil.MockSize(8)]
+        size = testutil.MockSize(8)
+        cloud_node = testutil.cloud_node_mock(8, size=size)
+        wishlist = [size]
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(8)],
-                         wishlist)
+                         wishlist, avail_sizes=[(size, {"cores":1})])
         self.assertEqual(1, self.alive_monitor_count())
         self.assertFalse(self.node_setup.start.called)
         monitor = self.monitor_list()[0].proxy()
@@ -470,8 +517,10 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(1, self.node_setup.start.call_count)
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
-        cloud_node = testutil.cloud_node_mock(6)
-        self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)])
+        size = testutil.MockSize(6)
+        cloud_node = testutil.cloud_node_mock(6, size=size)
+        self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)],
+                         avail_sizes=[(size, {"cores":1})])
         self.assertEqual(1, self.alive_monitor_count())
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
@@ -495,9 +544,11 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertFalse(self.node_setup.start.called)
 
     def test_nodes_shutting_down_count_against_excess(self):
-        cloud_nodes = [testutil.cloud_node_mock(n) for n in [8, 9]]
-        arv_nodes = [testutil.arvados_node_mock(n) for n in [8, 9]]
-        self.make_daemon(cloud_nodes, arv_nodes, [testutil.MockSize(8)])
+        size = testutil.MockSize(8)
+        cloud_nodes = [testutil.cloud_node_mock(n, size=size) for n in [8, 9]]
+        arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
+        self.make_daemon(cloud_nodes, arv_nodes, [size],
+                         avail_sizes=[(size, {"cores":1})])
         self.assertEqual(2, self.alive_monitor_count())
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
@@ -534,7 +585,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
         self.assertEqual(
-            1, self.node_shutdown.start().proxy().stop().get.call_count)
+            1, self.last_shutdown.stop.call_count)
 
     def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
         self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
@@ -543,8 +594,124 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         # We're mainly testing that update_cloud_nodes catches and handles
         # the ActorDeadError.
-        stop_method = self.node_shutdown.start().proxy().stop().get
-        stop_method.side_effect = pykka.ActorDeadError
+        self.last_shutdown.stop.side_effect = pykka.ActorDeadError
         self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertEqual(1, stop_method.call_count)
+        self.assertEqual(1, self.last_shutdown.stop.call_count)
+
+    def busywait(self, f):
+        n = 0
+        while not f() and n < 10:
+            time.sleep(.1)
+            n += 1
+        self.assertTrue(f())
+
+    def test_node_create_two_sizes(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+                        (testutil.MockSize(2), {"cores":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=4)
+
+        # the daemon runs in another thread, so we need to wait and see
+        # if it does all the work we're expecting it to do before stopping it.
+        self.busywait(lambda: self.node_setup.start.call_count == 4)
+        booting = self.daemon.booting.get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        logging.info(sizecounts)
+        self.assertEqual(3, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
+
+    def test_node_max_nodes_two_sizes(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+                        (testutil.MockSize(2), {"cores":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=3)
+
+        # the daemon runs in another thread, so we need to wait and see
+        # if it does all the work we're expecting it to do before stopping it.
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        self.assertEqual(2, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
+
+    def test_wishlist_reconfigure(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(small, {"cores":1}), (big, {"cores":2})]
+
+        self.make_daemon(cloud_nodes=[testutil.cloud_node_mock(1, small),
+                                      testutil.cloud_node_mock(2, small),
+                                      testutil.cloud_node_mock(3, big)],
+                         arvados_nodes=[testutil.arvados_node_mock(1),
+                                        testutil.arvados_node_mock(2),
+                                        testutil.arvados_node_mock(3)],
+                         want_sizes=[small, small, big],
+                         avail_sizes=avail_sizes)
+
+        self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
+
+        self.assertEqual(0, self.node_shutdown.start.call_count)
+
+        for c in self.daemon.cloud_nodes.get().nodes.itervalues():
+            self.daemon.node_can_shutdown(c.actor)
+
+        booting = self.daemon.booting.get()
+        shutdowns = self.daemon.shutdowns.get()
+
+        self.stop_proxy(self.daemon)
+
+        self.assertEqual(1, self.node_setup.start.call_count)
+        self.assertEqual(1, self.node_shutdown.start.call_count)
+
+        # booting a new big node
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        self.assertEqual(0, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
+
+        # shutting down a small node
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in shutdowns.itervalues():
+            sizecounts[b.cloud_node.get().size.id] += 1
+        self.assertEqual(1, sizecounts[small.id])
+        self.assertEqual(0, sizecounts[big.id])
+
+    def test_node_max_price(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
+                        (testutil.MockSize(2), {"cores":2, "price":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes,
+                         max_nodes=4,
+                         max_total_price=4)
+        # the daemon runs in another thread, so we need to wait and see
+        # if it does all the work we're expecting it to do before stopping it.
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get()
+        self.stop_proxy(self.daemon)
+
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        logging.info(sizecounts)
+
+        # Booting 3 small nodes and not booting a big node would also partially
+        # satisfy the wishlist and come in under the price cap, however the way
+        # the update_server_wishlist() currently works effectively results in a
+        # round-robin creation of one node of each size in the wishlist, so
+        # test for that.
+        self.assertEqual(2, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
index 4c97aed8b109a576843105bad7cf7f62b14426ce..d4dc42f13959e5bef28aad66dd3ef4d0e0abe13e 100644 (file)
@@ -48,6 +48,16 @@ class ServerCalculatorTestCase(unittest.TestCase):
                                   {'min_scratch_mb_per_node': 200})
         self.assertEqual(6, len(servlist))
 
+    def test_ignore_too_expensive_jobs(self):
+        servcalc = self.make_calculator([1, 2], max_nodes=12, max_price=6)
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 1, 'min_nodes': 6})
+        self.assertEqual(6, len(servlist))
+
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 2, 'min_nodes': 6})
+        self.assertEqual(0, len(servlist))
+
     def test_job_requesting_max_nodes_accepted(self):
         servcalc = self.make_calculator([1], max_nodes=4)
         servlist = self.calculate(servcalc, {'min_nodes': 4})
@@ -57,6 +67,45 @@ class ServerCalculatorTestCase(unittest.TestCase):
         servcalc = self.make_calculator([2, 4, 1, 3])
         self.assertEqual(testutil.MockSize(1), servcalc.cheapest_size())
 
+    def test_next_biggest(self):
+        servcalc = self.make_calculator([1, 2, 4, 8])
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 3},
+                                  {'min_cores_per_node': 6})
+        self.assertEqual([servcalc.cloud_sizes[2].id,
+                          servcalc.cloud_sizes[3].id],
+                         [s.id for s in servlist])
+
+    def test_multiple_sizes(self):
+        servcalc = self.make_calculator([1, 2])
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 2},
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 1})
+        self.assertEqual([servcalc.cloud_sizes[1].id,
+                          servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[0].id],
+                         [s.id for s in servlist])
+
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 2},
+                                  {'min_cores_per_node': 1})
+        self.assertEqual([servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[1].id,
+                          servcalc.cloud_sizes[0].id],
+                         [s.id for s in servlist])
+
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 2})
+        self.assertEqual([servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[1].id],
+                         [s.id for s in servlist])
+
+
 
 class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
                                    unittest.TestCase):
@@ -82,4 +131,3 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
 if __name__ == '__main__':
     unittest.main()
-
index 82d6479e24ae53b33b676637ca772d867326b196..aeb9768a4b310c120abce4270354bfc25d710884 100644 (file)
@@ -44,17 +44,6 @@ def cloud_object_mock(name_id, **extra):
     cloud_object.extra = extra
     return cloud_object
 
-def cloud_node_mock(node_num=99, **extra):
-    node = mock.NonCallableMagicMock(
-        ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
-         'image', 'extra'],
-        name='cloud_node')
-    node.id = str(node_num)
-    node.name = node.id
-    node.public_ips = []
-    node.private_ips = [ip_address_mock(node_num)]
-    node.extra = extra
-    return node
 
 def cloud_node_fqdn(node):
     # We intentionally put the FQDN somewhere goofy to make sure tested code is
@@ -148,3 +137,16 @@ class RemotePollLoopActorTestMixin(ActorTestMixin):
         self.subscriber = mock.Mock(name='subscriber_mock')
         self.monitor = self.TEST_CLASS.start(
             self.client, self.timer, *args, **kwargs).proxy()
+
+def cloud_node_mock(node_num=99, size=MockSize(1), **extra):
+    node = mock.NonCallableMagicMock(
+        ['id', 'name', 'state', 'public_ips', 'private_ips', 'driver', 'size',
+         'image', 'extra'],
+        name='cloud_node')
+    node.id = str(node_num)
+    node.name = node.id
+    node.size = size
+    node.public_ips = []
+    node.private_ips = [ip_address_mock(node_num)]
+    node.extra = extra
+    return node