projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '8543-nodemanager-fewer-futures' closes #8543
[arvados.git]
/
services
/
nodemanager
/
arvnodeman
/
daemon.py
diff --git
a/services/nodemanager/arvnodeman/daemon.py
b/services/nodemanager/arvnodeman/daemon.py
index 47e77d13449160d26c4b83c514f3a6dc11bdd850..33b6cd58f6aff2897cef4c89d0c4b60a149b0ee4 100644
(file)
--- a/
services/nodemanager/arvnodeman/daemon.py
+++ b/
services/nodemanager/arvnodeman/daemon.py
@@
-121,7
+121,7
@@
class NodeManagerDaemonActor(actor_class):
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
self._new_arvados = arvados_factory
self._new_cloud = cloud_factory
self._cloud_driver = self._new_cloud()
- self._later = self.actor_ref.proxy()
+ self._later = self.actor_ref.
tell_
proxy()
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.min_cloud_size = self.server_calculator.cheapest_size()
self.shutdown_windows = shutdown_windows
self.server_calculator = server_calculator
self.min_cloud_size = self.server_calculator.cheapest_size()
@@
-142,6
+142,7
@@
class NodeManagerDaemonActor(actor_class):
self.booting = {} # Actor IDs to ComputeNodeSetupActors
self.booted = {} # Cloud node IDs to _ComputeNodeRecords
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
self.booting = {} # Actor IDs to ComputeNodeSetupActors
self.booted = {} # Cloud node IDs to _ComputeNodeRecords
self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
+ self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
def on_start(self):
self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
def on_start(self):
self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
@@
-173,11
+174,12
@@
class NodeManagerDaemonActor(actor_class):
poll_stale_after=self.poll_stale_after,
node_stale_after=self.node_stale_after,
cloud_client=self._cloud_driver,
poll_stale_after=self.poll_stale_after,
node_stale_after=self.node_stale_after,
cloud_client=self._cloud_driver,
- boot_fail_after=self.boot_fail_after).proxy()
- actor.subscribe(self._later.node_can_shutdown)
+ boot_fail_after=self.boot_fail_after)
+ actorTell = actor.tell_proxy()
+ actorTell.subscribe(self._later.node_can_shutdown)
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
self._cloud_nodes_actor.subscribe_to(cloud_node.id,
- actor.update_cloud_node)
- record = _ComputeNodeRecord(actor, cloud_node)
+ actor
Tell
.update_cloud_node)
+ record = _ComputeNodeRecord(actor
.proxy()
, cloud_node)
return record
def update_cloud_nodes(self, nodelist):
return record
def update_cloud_nodes(self, nodelist):
@@
-200,6
+202,7
@@
class NodeManagerDaemonActor(actor_class):
except pykka.ActorDeadError:
pass
del self.shutdowns[key]
except pykka.ActorDeadError:
pass
del self.shutdowns[key]
+ del self.sizes_booting_shutdown[key]
record.actor.stop()
record.cloud_node = None
record.actor.stop()
record.cloud_node = None
@@
-218,8
+221,8
@@
class NodeManagerDaemonActor(actor_class):
def _nodes_booting(self, size):
s = sum(1
def _nodes_booting(self, size):
s = sum(1
- for c in self.booting.iter
value
s()
- if size is None or
c.cloud_size.get()
.id == size.id)
+ for c in self.booting.iter
key
s()
+ if size is None or
self.sizes_booting_shutdown[c]
.id == size.id)
s += sum(1
for c in self.booted.itervalues()
if size is None or c.cloud_node.size.id == size.id)
s += sum(1
for c in self.booted.itervalues()
if size is None or c.cloud_node.size.id == size.id)
@@
-241,8
+244,8
@@
class NodeManagerDaemonActor(actor_class):
def _total_price(self):
cost = 0
def _total_price(self):
cost = 0
- cost += sum(self.server_calculator.find_size(
c.cloud_size.get()
.id).price
- for c in self.booting.iter
value
s())
+ cost += sum(self.server_calculator.find_size(
self.sizes_booting_shutdown[c]
.id).price
+ for c in self.booting.iter
key
s())
cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
for i in (self.booted, self.cloud_nodes.nodes)
for c in i.itervalues())
cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
for i in (self.booted, self.cloud_nodes.nodes)
for c in i.itervalues())
@@
-267,9
+270,9
@@
class NodeManagerDaemonActor(actor_class):
def _size_shutdowns(self, size):
sh = 0
def _size_shutdowns(self, size):
sh = 0
- for c in self.shutdowns.iter
value
s():
+ for c in self.shutdowns.iter
key
s():
try:
try:
- if
c.cloud_node.get().size
.id == size.id:
+ if
self.sizes_booting_shutdown[c]
.id == size.id:
sh += 1
except pykka.ActorDeadError:
pass
sh += 1
except pykka.ActorDeadError:
pass
@@
-358,8
+361,10
@@
class NodeManagerDaemonActor(actor_class):
arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
arvados_client=self._new_arvados(),
arvados_node=arvados_node,
cloud_client=self._new_cloud(),
- cloud_size=cloud_size).proxy()
+ cloud_size=cloud_size).
tell_
proxy()
self.booting[new_setup.actor_ref.actor_urn] = new_setup
self.booting[new_setup.actor_ref.actor_urn] = new_setup
+ self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
time.time())
if arvados_node is not None:
self.arvados_nodes[arvados_node['uuid']].assignment_time = (
time.time())
@@
-373,6
+378,8
@@
class NodeManagerDaemonActor(actor_class):
def node_up(self, setup_proxy):
cloud_node = setup_proxy.cloud_node.get()
del self.booting[setup_proxy.actor_ref.actor_urn]
def node_up(self, setup_proxy):
cloud_node = setup_proxy.cloud_node.get()
del self.booting[setup_proxy.actor_ref.actor_urn]
+ del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
+
setup_proxy.stop()
if cloud_node is not None:
record = self.cloud_nodes.get(cloud_node.id)
setup_proxy.stop()
if cloud_node is not None:
record = self.cloud_nodes.get(cloud_node.id)
@@
-390,12
+397,15
@@
class NodeManagerDaemonActor(actor_class):
for key, node in self.booting.iteritems():
if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
for key, node in self.booting.iteritems():
if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
del self.booting[key]
+ del self.sizes_booting_shutdown[key]
+
if nodes_excess > 1:
self._later.stop_booting_node(size)
break
def _begin_node_shutdown(self, node_actor, cancellable):
if nodes_excess > 1:
self._later.stop_booting_node(size)
break
def _begin_node_shutdown(self, node_actor, cancellable):
- cloud_node_id = node_actor.cloud_node.get().id
+ cloud_node_obj = node_actor.cloud_node.get()
+ cloud_node_id = cloud_node_obj.id
if cloud_node_id in self.shutdowns:
return None
shutdown = self._node_shutdown.start(
if cloud_node_id in self.shutdowns:
return None
shutdown = self._node_shutdown.start(
@@
-403,7
+413,8
@@
class NodeManagerDaemonActor(actor_class):
arvados_client=self._new_arvados(),
node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
arvados_client=self._new_arvados(),
node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
self.shutdowns[cloud_node_id] = shutdown
- shutdown.subscribe(self._later.node_finished_shutdown)
+ self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
+ shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
@_check_poll_freshness
def node_can_shutdown(self, node_actor):
@@
-428,10
+439,10
@@
class NodeManagerDaemonActor(actor_class):
if not success:
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
if not success:
if cancel_reason == self._node_shutdown.NODE_BROKEN:
self.cloud_nodes.blacklist(cloud_node_id)
- del self.shutdowns[cloud_node_id]
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
elif cloud_node_id in self.booted:
self.booted.pop(cloud_node_id).actor.stop()
- del self.shutdowns[cloud_node_id]
+ del self.shutdowns[cloud_node_id]
+ del self.sizes_booting_shutdown[cloud_node_id]
def shutdown(self):
self._logger.info("Shutting down after signal.")
def shutdown(self):
self._logger.info("Shutting down after signal.")