Updated config file examples.
'poll_time': '60',
'max_poll_time': '300',
'poll_stale_after': '600',
+ 'max_total_price': 0,
'boot_fail_after': str(sys.maxint),
'node_stale_after': str(60 * 60 * 2)},
'Logging': {'file': '/dev/stderr',
node_stale_after=7200,
node_setup_class=dispatch.ComputeNodeSetupActor,
node_shutdown_class=dispatch.ComputeNodeShutdownActor,
- node_actor_class=dispatch.ComputeNodeMonitorActor):
+ node_actor_class=dispatch.ComputeNodeMonitorActor,
+ max_total_price=0):
super(NodeManagerDaemonActor, self).__init__()
self._node_setup = node_setup_class
self._node_shutdown = node_shutdown_class
self.min_cloud_size = self.server_calculator.cheapest_size()
self.min_nodes = min_nodes
self.max_nodes = max_nodes
+ self.max_total_price = max_total_price
self.poll_stale_after = poll_stale_after
self.boot_fail_after = boot_fail_after
self.node_stale_after = node_stale_after
if size is None or c.cloud_node.size.id == size.id)
return up
+ def _total_price(self):
+ cost = 0
+ cost += sum(c.cloud_size.get().price
+ for c in self.booting.itervalues())
+ cost += sum(c.cloud_node.size.price
+ for i in (self.booted, self.cloud_nodes.nodes)
+ for c in i.itervalues())
+ return cost
+
def _nodes_busy(self, size):
return sum(1 for busy in
pykka.get_all(rec.actor.in_state('busy') for rec in
total_up_count = self._nodes_up(None)
under_min = self.min_nodes - total_up_count
over_max = total_up_count - self.max_nodes
+ total_price = self._total_price()
+
if over_max >= 0:
return -over_max
+ elif self.max_total_price and ((total_price + size.price) > self.max_total_price):
+ self._logger.info("Not booting new %s (price %s) because with current total_price of %s it would exceed max_total_price of %s",
+ size.name, size.price, total_price, self.max_total_price)
+ return 0
elif under_min > 0 and size.id == self.min_cloud_size.id:
return under_min
- else:
- up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
- self._nodes_busy(size) +
- self._nodes_missing(size))
- return self._size_wishlist(size) - up_count
+
+ up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
+ self._nodes_busy(size) +
+ self._nodes_missing(size))
+ return self._size_wishlist(size) - up_count
def _nodes_excess(self, size):
up_count = self._nodes_up(size) - self._size_shutdowns(size)
def cheapest_size(self):
return self.cloud_sizes[0]
- def find_size(self, sz):
- for s in self.cloud_sizes:
- if s.id == sz.id:
- return s
- return None
-
class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
"""Actor to generate server wishlists from the job queue.
config.getint('Daemon', 'poll_stale_after'),
config.getint('Daemon', 'boot_fail_after'),
config.getint('Daemon', 'node_stale_after'),
- node_setup, node_shutdown, node_monitor).proxy()
+ node_setup, node_shutdown, node_monitor,
+ max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
signal.pause()
daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
# many are running.
max_nodes = 8
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
# Poll Azure nodes and Arvados for new information every N seconds.
poll_time = 60
# the API server to ping
ping_host = hostname:port
-[Size Standard_D3]
-# You can define any number of Size sections to list Azure sizes you're
-# willing to use. The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# You can define any number of Size sections to list Azure sizes you're willing
+# to use. The Node Manager should boot the cheapest size(s) that can run jobs
+# in the queue. You must also provide price per hour as the Azure driver
+# compute currently does not report prices.
+#
+# See https://azure.microsoft.com/en-us/pricing/details/virtual-machines/
+# for a list of known machine types that may be used as a Size parameter.
+#
# Each size section MUST define the number of cores are available in this
# size class (since libcloud does not provide any consistent API for exposing
# this setting).
# You may also want to define the amount of scratch space (expressed
# in GB) for Crunch jobs. You can also override Microsoft's provided
-# data fields by setting the same names here.
+# data fields by setting them here.
+
+[Size Standard_D3]
cores = 4
-scratch = 200
+price = 0.56
+
+[Size Standard_D4]
+cores = 8
+price = 1.12
# many are running.
max_nodes = 8
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
# Poll EC2 nodes and Arvados for new information every N seconds.
poll_time = 60
# compute node.
security_groups = idstring1, idstring2
-[Size t2.medium]
+
# You can define any number of Size sections to list EC2 sizes you're
# willing to use. The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
+#
# Each size section MUST define the number of cores are available in this
# size class (since libcloud does not provide any consistent API for exposing
# this setting).
# You may also want to define the amount of scratch space (expressed
# in GB) for Crunch jobs. You can also override Amazon's provided
-# data fields by setting the same names here.
+# data fields (such as price per hour) by setting them here.
+
+[Size m4.large]
cores = 2
+price = 0.126
+scratch = 100
+
+[Size m4.xlarge]
+cores = 4
+price = 0.252
scratch = 100
# This is the longest time to wait between polls.
max_poll_time = 300
+# Upper limit on rate of spending (in $/hr), will not boot additional nodes
+# if total price of already running nodes meets or exceeds this threshold.
+# default 0 means no limit.
+max_total_price = 0
+
# If Node Manager can't succesfully poll a service for this long,
# it will never start or stop compute nodes, on the assumption that its
# information is too outdated.
# See http://libcloud.readthedocs.org/en/latest/compute/drivers/gce.html#specifying-service-account-scopes
# service_accounts = [{'email':'account@example.com', 'scopes':['storage-ro']}]
-[Size n1-standard-2]
+
# You can define any number of Size sections to list node sizes you're
# willing to use. The Node Manager should boot the cheapest size(s) that
-# can run jobs in the queue (N.B.: defining more than one size has not been
-# tested yet).
+# can run jobs in the queue.
#
# The Size fields are interpreted the same way as with a libcloud NodeSize:
# http://libcloud.readthedocs.org/en/latest/compute/api.html#libcloud.compute.base.NodeSize
# this setting).
# You may also want to define the amount of scratch space (expressed
# in GB) for Crunch jobs.
+# You can also override Google's provided data fields (such as price per hour)
+# by setting them here.
+
+[Size n1-standard-2]
cores = 2
+price = 0.076
scratch = 100
-ram = 512
+
+[Size n1-standard-4]
+cores = 4
+price = 0.152
+scratch = 200
\ No newline at end of file
def make_daemon(self, cloud_nodes=[], arvados_nodes=[], want_sizes=[],
avail_sizes=[(testutil.MockSize(1), {"cores": 1})],
min_nodes=0, max_nodes=8,
- shutdown_windows=[54, 5, 1]):
+ shutdown_windows=[54, 5, 1],
+ max_total_price=None):
for name in ['cloud_nodes', 'arvados_nodes', 'server_wishlist']:
setattr(self, name + '_poller', mock.MagicMock(name=name + '_mock'))
self.arv_factory = mock.MagicMock(name='arvados_mock')
self.arv_factory, self.cloud_factory,
shutdown_windows, ServerCalculator(avail_sizes),
min_nodes, max_nodes, 600, 1800, 3600,
- self.node_setup, self.node_shutdown).proxy()
+ self.node_setup, self.node_shutdown,
+ max_total_price=max_total_price).proxy()
if cloud_nodes is not None:
self.daemon.update_cloud_nodes(cloud_nodes).get(self.TIMEOUT)
if arvados_nodes is not None:
self.stop_proxy(self.daemon)
self.assertEqual(1, self.last_shutdown.stop.call_count)
+ def busywait(self, f):
+ n = 0
+ while not f() and n < 10:
+ time.sleep(.1)
+ n += 1
+ self.assertTrue(f())
+
def test_node_create_two_sizes(self):
small = testutil.MockSize(1)
big = testutil.MockSize(2)
avail_sizes = [(testutil.MockSize(1), {"cores":1}),
(testutil.MockSize(2), {"cores":2})]
- self.make_daemon(want_sizes=[small, small, big],
- avail_sizes=avail_sizes)
- booting = self.daemon.booting.get()
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes, max_nodes=4)
+ self.busywait(lambda: self.node_setup.start.call_count == 4)
+ booting = self.daemon.booting.get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ logging.info(sizecounts)
+ self.assertEqual(3, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
+
+ def test_node_max_nodes_two_sizes(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+ (testutil.MockSize(2), {"cores":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes, max_nodes=3)
+ self.busywait(lambda: self.node_setup.start.call_count == 3)
+ booting = self.daemon.booting.get(self.TIMEOUT)
self.stop_proxy(self.daemon)
- self.assertEqual(3, self.node_setup.start.call_count)
sizecounts = {a[0].id: 0 for a in avail_sizes}
for b in booting.itervalues():
sizecounts[b.cloud_size.get().id] += 1
self.assertEqual(1, self.node_setup.start.call_count)
self.assertEqual(1, self.node_shutdown.start.call_count)
+ # booting a new big node
sizecounts = {a[0].id: 0 for a in avail_sizes}
for b in booting.itervalues():
sizecounts[b.cloud_size.get().id] += 1
self.assertEqual(0, sizecounts[small.id])
self.assertEqual(1, sizecounts[big.id])
+ # shutting down a small node
sizecounts = {a[0].id: 0 for a in avail_sizes}
for b in shutdowns.itervalues():
sizecounts[b.cloud_node.get().size.id] += 1
self.assertEqual(1, sizecounts[small.id])
self.assertEqual(0, sizecounts[big.id])
+
+ def test_node_max_price(self):
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1, "price":1}),
+ (testutil.MockSize(2), {"cores":2, "price":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes,
+ max_nodes=4,
+ max_total_price=4)
+ self.busywait(lambda: self.node_setup.start.call_count == 3)
+ booting = self.daemon.booting.get()
+ self.stop_proxy(self.daemon)
+ self.assertEqual(3, self.node_setup.start.call_count)
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ logging.info(sizecounts)
+ # The way the update_server_wishlist() works effectively results in a
+ # round-robin creation of one node of each size in the wishlist
+ self.assertEqual(2, sizecounts[small.id])
+ self.assertEqual(1, sizecounts[big.id])
servcalc = self.make_calculator([2, 4, 1, 3])
self.assertEqual(testutil.MockSize(1), servcalc.cheapest_size())
+ def test_next_biggest(self):
+ servcalc = self.make_calculator([1, 2, 4, 8])
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 3},
+ {'min_cores_per_node': 6})
+ self.assertEqual([servcalc.cloud_sizes[2].id,
+ servcalc.cloud_sizes[3].id],
+ [s.id for s in servlist])
+
+ def test_multiple_sizes(self):
+ servcalc = self.make_calculator([1, 2])
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1})
+ self.assertEqual([servcalc.cloud_sizes[1].id,
+ servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[0].id],
+ [s.id for s in servlist])
+
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2},
+ {'min_cores_per_node': 1})
+ self.assertEqual([servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[1].id,
+ servcalc.cloud_sizes[0].id],
+ [s.id for s in servlist])
+
+ servlist = self.calculate(servcalc,
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 1},
+ {'min_cores_per_node': 2})
+ self.assertEqual([servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[0].id,
+ servcalc.cloud_sizes[1].id],
+ [s.id for s in servlist])
+
+
class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
unittest.TestCase):
if __name__ == '__main__':
unittest.main()
-