5353: Added max_total_price. Added more tests for multiple node sizes.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 16 Nov 2015 21:21:34 +0000 (16:21 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 16 Nov 2015 21:21:34 +0000 (16:21 -0500)
Updated config file examples.

services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/doc/azure.example.cfg
services/nodemanager/doc/ec2.example.cfg
services/nodemanager/doc/gce.example.cfg
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_jobqueue.py

index 315df1c3f984e29a0edfc09c71f76051def7480d..777e08242e18eaceb0cf6f4360f65afd509ce559 100644 (file)
@@ -41,6 +41,7 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'poll_time': '60',
                        'max_poll_time': '300',
                        'poll_stale_after': '600',
+                       'max_total_price': 0,
                        'boot_fail_after': str(sys.maxint),
                        'node_stale_after': str(60 * 60 * 2)},
             'Logging': {'file': '/dev/stderr',
index f4cd4563db437e6b229b861c1352ef17bbd55d82..cd967a4130efe9a9c7746e3003108828acc8be0b 100644 (file)
@@ -110,7 +110,8 @@ class NodeManagerDaemonActor(actor_class):
                  node_stale_after=7200,
                  node_setup_class=dispatch.ComputeNodeSetupActor,
                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
-                 node_actor_class=dispatch.ComputeNodeMonitorActor):
+                 node_actor_class=dispatch.ComputeNodeMonitorActor,
+                 max_total_price=0):
         super(NodeManagerDaemonActor, self).__init__()
         self._node_setup = node_setup_class
         self._node_shutdown = node_shutdown_class
@@ -127,6 +128,7 @@ class NodeManagerDaemonActor(actor_class):
         self.min_cloud_size = self.server_calculator.cheapest_size()
         self.min_nodes = min_nodes
         self.max_nodes = max_nodes
+        self.max_total_price = max_total_price
         self.poll_stale_after = poll_stale_after
         self.boot_fail_after = boot_fail_after
         self.node_stale_after = node_stale_after
@@ -223,6 +225,15 @@ class NodeManagerDaemonActor(actor_class):
                   if size is None or c.cloud_node.size.id == size.id)
         return up
 
+    def _total_price(self):
+        cost = 0
+        cost += sum(c.cloud_size.get().price
+                  for c in self.booting.itervalues())
+        cost += sum(c.cloud_node.size.price
+                  for i in (self.booted, self.cloud_nodes.nodes)
+                  for c in i.itervalues())
+        return cost
+
     def _nodes_busy(self, size):
         return sum(1 for busy in
                    pykka.get_all(rec.actor.in_state('busy') for rec in
@@ -248,15 +259,21 @@ class NodeManagerDaemonActor(actor_class):
         total_up_count = self._nodes_up(None)
         under_min = self.min_nodes - total_up_count
         over_max = total_up_count - self.max_nodes
+        total_price = self._total_price()
+
         if over_max >= 0:
             return -over_max
+        elif self.max_total_price and ((total_price + size.price) > self.max_total_price):
+            self._logger.info("Not booting new %s (price %s) because with current total_price of %s it would exceed max_total_price of %s",
+                              size.name, size.price, total_price, self.max_total_price)
+            return 0
         elif under_min > 0 and size.id == self.min_cloud_size.id:
             return under_min
-        else:
-            up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
-                                               self._nodes_busy(size) +
-                                               self._nodes_missing(size))
-            return self._size_wishlist(size) - up_count
+
+        up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
+                                           self._nodes_busy(size) +
+                                           self._nodes_missing(size))
+        return self._size_wishlist(size) - up_count
 
     def _nodes_excess(self, size):
         up_count = self._nodes_up(size) - self._size_shutdowns(size)
index ebe79fde345fe526575befea0d66f6cb7a66099f..06f66b71c244f5662fb0c9871f7c92b0603f1617 100644 (file)
@@ -83,12 +83,6 @@ class ServerCalculator(object):
     def cheapest_size(self):
         return self.cloud_sizes[0]
 
-    def find_size(self, sz):
-        for s in self.cloud_sizes:
-            if s.id == sz.id:
-                return s
-        return None
-
 
 class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to generate server wishlists from the job queue.
index 5dfdb1d071f085c76f39bf53ea7a359581fbfb67..592d217583c481893f9294315c626a0f55153e0e 100644 (file)
@@ -120,7 +120,8 @@ def main(args=None):
         config.getint('Daemon', 'poll_stale_after'),
         config.getint('Daemon', 'boot_fail_after'),
         config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor).proxy()
+        node_setup, node_shutdown, node_monitor,
+        max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
 
     signal.pause()
     daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
index 50161c288c723a7a9a2753622a8f2481fd6484f7..ef56ce146d72d2271cd912333fbf72040cb2a211 100644 (file)
@@ -15,6 +15,11 @@ min_nodes = 0
 # many are running.
 max_nodes = 8
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # Poll Azure nodes and Arvados for new information every N seconds.
 poll_time = 60
 
@@ -138,16 +143,25 @@ tag_cluster = zyxwv
 # the API server to ping
 ping_host = hostname:port
 
-[Size Standard_D3]
-# You can define any number of Size sections to list Azure sizes you're
-# willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use.  The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue.  You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
 # Each size section MUST define the number of cores are available in this
 # size class (since libcloud does not provide any consistent API for exposing
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.  You can also override Microsoft's provided
-# data fields by setting the same names here.
+# data fields by setting them here.
+
+[Size Standard_D3]
 cores = 4
-scratch = 200
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12
index 9b41ca14d54fc52956650649170574e90d99ae78..5c882e5a1e25d34e919c6787e2e96a5505881582 100644 (file)
@@ -15,6 +15,11 @@ min_nodes = 0
 # many are running.
 max_nodes = 8
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # Poll EC2 nodes and Arvados for new information every N seconds.
 poll_time = 60
 
@@ -123,16 +128,24 @@ subnet_id = idstring
 # compute node.
 security_groups = idstring1, idstring2
 
-[Size t2.medium]
+
 # You can define any number of Size sections to list EC2 sizes you're
 # willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
+#
 # Each size section MUST define the number of cores are available in this
 # size class (since libcloud does not provide any consistent API for exposing
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.  You can also override Amazon's provided
-# data fields by setting the same names here.
+# data fields (such as price per hour) by setting them here.
+
+[Size m4.large]
 cores = 2
+price = 0.126
+scratch = 100
+
+[Size m4.xlarge]
+cores = 4
+price = 0.252
 scratch = 100
index 67703703ba51b8ced8eac18b0ab9f72d65b16b08..2da9c5ffddb5a58527cf35bb9245f457ee3420bc 100644 (file)
@@ -17,6 +17,11 @@ poll_time = 60
 # This is the longest time to wait between polls.
 max_poll_time = 300
 
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
 # If Node Manager can't succesfully poll a service for this long,
 # it will never start or stop compute nodes, on the assumption that its
 # information is too outdated.
@@ -118,11 +123,10 @@ image = debian-7
 # See http://libcloud.readthedocs.org/en/latest/compute/drivers/gce.html#specifying-service-account-scopes
 # service_accounts = [{'email':'account@example.com', 'scopes':['storage-ro']}]
 
-[Size n1-standard-2]
+
 # You can define any number of Size sections to list node sizes you're
 # willing to use.  The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
 #
 # The Size fields are interpreted the same way as with a libcloud NodeSize:
 # http://libcloud.readthedocs.org/en/latest/compute/api.html#libcloud.compute.base.NodeSize
@@ -135,6 +139,15 @@ image = debian-7
 # this setting).
 # You may also want to define the amount of scratch space (expressed
 # in GB) for Crunch jobs.
+# You can also override Google's provided data fields (such as price per hour)
+# by setting them here.
+
+[Size n1-standard-2]
 cores = 2
+price = 0.076
 scratch = 100
-ram = 512
+
+[Size n1-standard-4]
+cores = 4
+price = 0.152
+scratch = 200
\ No newline at end of file
index 865191ebad50fbdfb831d3e069ca906681371a14..2510c79470edf2a6787981a15feafb60fdfc37b9 100644 (file)
@@ -49,7 +49,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
     def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
                     avail_sizes=[(testutil.MockSize(1), {"cores": 1})],
                     min_nodes=0, max_nodes=8,
-                    shutdown_windows=[54, 5, 1]):
+                    shutdown_windows=[54, 5, 1],
+                    max_total_price=None):
         for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
             setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
         self.arv_factory = mock.MagicMock(name='arvados_mock')
@@ -71,7 +72,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
             self.arv_factory, self.cloud_factory,
             shutdown_windows, ServerCalculator(avail_sizes),
             min_nodes, max_nodes, 600, 1800, 3600,
-            self.node_setup, self.node_shutdown).proxy()
+            self.node_setup, self.node_shutdown,
+            max_total_price=max_total_price).proxy()
         if cloud_nodes is not None:
             self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
         if arvados_nodes is not None:
@@ -597,16 +599,40 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.stop_proxy(self.daemon)
         self.assertEqual(1, self.last_shutdown.stop.call_count)
 
+    def busywait(self, f):
+        n = 0
+        while not f() and n < 10:
+            time.sleep(.1)
+            n += 1
+        self.assertTrue(f())
+
     def test_node_create_two_sizes(self):
         small = testutil.MockSize(1)
         big = testutil.MockSize(2)
         avail_sizes = [(testutil.MockSize(1), {"cores":1}),
                         (testutil.MockSize(2), {"cores":2})]
-        self.make_daemon(want_sizes=[small, small, big],
-                         avail_sizes=avail_sizes)
-        booting = self.daemon.booting.get()
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=4)
+        self.busywait(lambda: self.node_setup.start.call_count == 4)
+        booting = self.daemon.booting.get(self.TIMEOUT)
+        self.stop_proxy(self.daemon)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        logging.info(sizecounts)
+        self.assertEqual(3, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
+
+    def test_node_max_nodes_two_sizes(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+                        (testutil.MockSize(2), {"cores":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes, max_nodes=3)
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
-        self.assertEqual(3, self.node_setup.start.call_count)
         sizecounts = {a[0].id: 0 for a in avail_sizes}
         for b in booting.itervalues():
             sizecounts[b.cloud_size.get().id] += 1
@@ -643,14 +669,38 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(1, self.node_setup.start.call_count)
         self.assertEqual(1, self.node_shutdown.start.call_count)
 
+        # booting a new big node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
         for b in booting.itervalues():
             sizecounts[b.cloud_size.get().id] += 1
         self.assertEqual(0, sizecounts[small.id])
         self.assertEqual(1, sizecounts[big.id])
 
+        # shutting down a small node
         sizecounts = {a[0].id: 0 for a in avail_sizes}
         for b in shutdowns.itervalues():
             sizecounts[b.cloud_node.get().size.id] += 1
         self.assertEqual(1, sizecounts[small.id])
         self.assertEqual(0, sizecounts[big.id])
+
+    def test_node_max_price(self):
+        small = testutil.MockSize(1)
+        big = testutil.MockSize(2)
+        avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
+                        (testutil.MockSize(2), {"cores":2, "price":2})]
+        self.make_daemon(want_sizes=[small, small, small, big],
+                         avail_sizes=avail_sizes,
+                         max_nodes=4,
+                         max_total_price=4)
+        self.busywait(lambda: self.node_setup.start.call_count == 3)
+        booting = self.daemon.booting.get()
+        self.stop_proxy(self.daemon)
+        self.assertEqual(3, self.node_setup.start.call_count)
+        sizecounts = {a[0].id: 0 for a in avail_sizes}
+        for b in booting.itervalues():
+            sizecounts[b.cloud_size.get().id] += 1
+        logging.info(sizecounts)
+        # The way the update_server_wishlist() works effectively results in a
+        # round-robin creation of one node of each size in the wishlist
+        self.assertEqual(2, sizecounts[small.id])
+        self.assertEqual(1, sizecounts[big.id])
index 4c97aed8b109a576843105bad7cf7f62b14426ce..2ddecd0973d08faabecf87287417efc1472e8762 100644 (file)
@@ -57,6 +57,45 @@ class ServerCalculatorTestCase(unittest.TestCase):
         servcalc = self.make_calculator([2, 4, 1, 3])
         self.assertEqual(testutil.MockSize(1), servcalc.cheapest_size())
 
+    def test_next_biggest(self):
+        servcalc = self.make_calculator([1, 2, 4, 8])
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 3},
+                                  {'min_cores_per_node': 6})
+        self.assertEqual([servcalc.cloud_sizes[2].id,
+                          servcalc.cloud_sizes[3].id],
+                         [s.id for s in servlist])
+
+    def test_multiple_sizes(self):
+        servcalc = self.make_calculator([1, 2])
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 2},
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 1})
+        self.assertEqual([servcalc.cloud_sizes[1].id,
+                          servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[0].id],
+                         [s.id for s in servlist])
+
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 2},
+                                  {'min_cores_per_node': 1})
+        self.assertEqual([servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[1].id,
+                          servcalc.cloud_sizes[0].id],
+                         [s.id for s in servlist])
+
+        servlist = self.calculate(servcalc,
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 1},
+                                  {'min_cores_per_node': 2})
+        self.assertEqual([servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[0].id,
+                          servcalc.cloud_sizes[1].id],
+                         [s.id for s in servlist])
+
+
 
 class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
                                    unittest.TestCase):
@@ -82,4 +121,3 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
 if __name__ == '__main__':
     unittest.main()
-