3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .config import actor_class
14 class _ComputeNodeRecord(object):
15 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16 assignment_time=float('-inf')):
18 self.cloud_node = cloud_node
19 self.arvados_node = arvados_node
20 self.assignment_time = assignment_time
23 class _BaseNodeTracker(object):
28 def __getitem__(self, key):
29 return self.nodes[key]
32 return len(self.nodes)
34 def get(self, key, default=None):
35 return self.nodes.get(key, default)
37 def record_key(self, record):
38 return self.item_key(getattr(record, self.RECORD_ATTR))
40 def add(self, record):
41 self.nodes[self.record_key(record)] = record
43 def update_record(self, key, item):
44 setattr(self.nodes[key], self.RECORD_ATTR, item)
46 def update_from(self, response):
47 unseen = set(self.nodes.iterkeys())
49 key = self.item_key(item)
52 self.update_record(key, item)
55 self.orphans = {key: self.nodes.pop(key) for key in unseen}
58 return (record for record in self.nodes.itervalues()
59 if getattr(record, self.PAIR_ATTR) is None)
62 class _CloudNodeTracker(_BaseNodeTracker):
63 RECORD_ATTR = 'cloud_node'
64 PAIR_ATTR = 'arvados_node'
65 item_key = staticmethod(lambda cloud_node: cloud_node.id)
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69 RECORD_ATTR = 'arvados_node'
70 PAIR_ATTR = 'cloud_node'
71 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
73 def find_stale_node(self, stale_time):
74 for record in self.nodes.itervalues():
75 node = record.arvados_node
76 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
78 not cnode.timestamp_fresh(record.assignment_time,
84 class NodeManagerDaemonActor(actor_class):
85 """Node Manager daemon.
87 This actor subscribes to all information polls about cloud nodes,
88 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
89 for every cloud node, subscribing them to poll updates
90 appropriately. It creates and destroys cloud nodes based on job queue
91 demand, and stops the corresponding ComputeNode actors when their work
94 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95 cloud_nodes_actor, cloud_update_actor, timer_actor,
96 arvados_factory, cloud_factory,
97 shutdown_windows, max_nodes,
98 poll_stale_after=600, node_stale_after=7200,
99 node_setup_class=cnode.ComputeNodeSetupActor,
100 node_shutdown_class=cnode.ComputeNodeShutdownActor,
101 node_actor_class=cnode.ComputeNodeMonitorActor):
102 super(NodeManagerDaemonActor, self).__init__()
103 self._node_setup = node_setup_class
104 self._node_shutdown = node_shutdown_class
105 self._node_actor = node_actor_class
106 self._cloud_updater = cloud_update_actor
107 self._timer = timer_actor
108 self._new_arvados = arvados_factory
109 self._new_cloud = cloud_factory
110 self._cloud_driver = self._new_cloud()
111 self._logger = logging.getLogger('arvnodeman.daemon')
112 self._later = self.actor_ref.proxy()
113 self.shutdown_windows = shutdown_windows
114 self.max_nodes = max_nodes
115 self.poll_stale_after = poll_stale_after
116 self.node_stale_after = node_stale_after
118 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
119 poll_actor = locals()[poll_name + '_actor']
120 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
121 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
122 self.last_polls[poll_name] = -self.poll_stale_after
123 self.cloud_nodes = _CloudNodeTracker()
124 self.arvados_nodes = _ArvadosNodeTracker()
125 self.booting = {} # Actor IDs to ComputeNodeSetupActors
126 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
127 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
128 self._logger.debug("Daemon initialized")
130 def _update_poll_time(self, poll_key):
131 self.last_polls[poll_key] = time.time()
133 def _pair_nodes(self, node_record, arvados_node):
134 self._logger.info("Cloud node %s has associated with Arvados node %s",
135 node_record.cloud_node.id, arvados_node['uuid'])
136 self._arvados_nodes_actor.subscribe_to(
137 arvados_node['uuid'], node_record.actor.update_arvados_node)
138 node_record.arvados_node = arvados_node
139 self.arvados_nodes.add(node_record)
141 def _new_node(self, cloud_node):
142 start_time = self._cloud_driver.node_start_time(cloud_node)
143 shutdown_timer = cnode.ShutdownTimer(start_time,
144 self.shutdown_windows)
145 actor = self._node_actor.start(
146 cloud_node=cloud_node,
147 cloud_node_start_time=start_time,
148 shutdown_timer=shutdown_timer,
149 update_actor=self._cloud_updater,
150 timer_actor=self._timer,
152 poll_stale_after=self.poll_stale_after,
153 node_stale_after=self.node_stale_after).proxy()
154 actor.subscribe(self._later.node_can_shutdown)
155 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
156 actor.update_cloud_node)
157 record = _ComputeNodeRecord(actor, cloud_node)
160 def update_cloud_nodes(self, nodelist):
161 self._update_poll_time('cloud_nodes')
162 for key, node in self.cloud_nodes.update_from(nodelist):
163 self._logger.info("Registering new cloud node %s", key)
164 if key in self.booted:
165 record = self.booted.pop(key)
167 record = self._new_node(node)
168 self.cloud_nodes.add(record)
169 for arv_rec in self.arvados_nodes.unpaired():
170 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
171 self._pair_nodes(record, arv_rec.arvados_node)
173 for key, record in self.cloud_nodes.orphans.iteritems():
175 self.shutdowns.pop(key, None)
177 def update_arvados_nodes(self, nodelist):
178 self._update_poll_time('arvados_nodes')
179 for key, node in self.arvados_nodes.update_from(nodelist):
180 self._logger.info("Registering new Arvados node %s", key)
181 record = _ComputeNodeRecord(arvados_node=node)
182 self.arvados_nodes.add(record)
183 for arv_rec in self.arvados_nodes.unpaired():
184 arv_node = arv_rec.arvados_node
185 for cloud_rec in self.cloud_nodes.unpaired():
186 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
187 self._pair_nodes(cloud_rec, arv_node)
190 def _node_count(self):
191 up = sum(len(nodelist) for nodelist in
192 [self.cloud_nodes, self.booted, self.booting])
193 return up - len(self.shutdowns)
195 def _nodes_wanted(self):
196 return len(self.last_wishlist) - self._node_count()
198 def _nodes_excess(self):
199 return -self._nodes_wanted()
201 def update_server_wishlist(self, wishlist):
202 self._update_poll_time('server_wishlist')
203 self.last_wishlist = wishlist[:self.max_nodes]
204 nodes_wanted = self._nodes_wanted()
206 self._later.start_node()
207 elif (nodes_wanted < 0) and self.booting:
208 self._later.stop_booting_node()
210 def _check_poll_freshness(orig_func):
211 """Decorator to inhibit a method when poll information is stale.
213 This decorator checks the timestamps of all the poll information the
214 daemon has received. The decorated method is only called if none
215 of the timestamps are considered stale.
217 @functools.wraps(orig_func)
218 def wrapper(self, *args, **kwargs):
220 if all(now - t < self.poll_stale_after
221 for t in self.last_polls.itervalues()):
222 return orig_func(self, *args, **kwargs)
227 @_check_poll_freshness
228 def start_node(self):
229 nodes_wanted = self._nodes_wanted()
232 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
233 cloud_size = self.last_wishlist[nodes_wanted - 1]
234 self._logger.info("Want %s more nodes. Booting a %s node.",
235 nodes_wanted, cloud_size.name)
236 new_setup = self._node_setup.start(
237 timer_actor=self._timer,
238 arvados_client=self._new_arvados(),
239 arvados_node=arvados_node,
240 cloud_client=self._new_cloud(),
241 cloud_size=cloud_size).proxy()
242 self.booting[new_setup.actor_ref.actor_urn] = new_setup
243 if arvados_node is not None:
244 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
246 new_setup.subscribe(self._later.node_up)
248 self._later.start_node()
250 def _actor_nodes(self, node_actor):
251 return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
253 def node_up(self, setup_proxy):
254 cloud_node, arvados_node = self._actor_nodes(setup_proxy)
255 del self.booting[setup_proxy.actor_ref.actor_urn]
257 record = self.cloud_nodes.get(cloud_node.id)
259 record = self._new_node(cloud_node)
260 self.booted[cloud_node.id] = record
261 self._pair_nodes(record, arvados_node)
263 @_check_poll_freshness
264 def stop_booting_node(self):
265 nodes_excess = self._nodes_excess()
266 if (nodes_excess < 1) or not self.booting:
268 for key, node in self.booting.iteritems():
269 node.stop_if_no_cloud_node().get()
270 if not node.actor_ref.is_alive():
271 del self.booting[key]
273 self._later.stop_booting_node()
276 @_check_poll_freshness
277 def node_can_shutdown(self, node_actor):
278 if self._nodes_excess() < 1:
280 cloud_node, arvados_node = self._actor_nodes(node_actor)
281 if cloud_node.id in self.shutdowns:
283 shutdown = self._node_shutdown.start(timer_actor=self._timer,
284 cloud_client=self._new_cloud(),
285 cloud_node=cloud_node).proxy()
286 self.shutdowns[cloud_node.id] = shutdown
287 shutdown.subscribe(self._later.node_finished_shutdown)
289 def node_finished_shutdown(self, shutdown_actor):
290 cloud_node_id = shutdown_actor.cloud_node.get().id
291 shutdown_actor.stop()
292 if cloud_node_id in self.booted:
293 self.booted.pop(cloud_node_id).actor.stop()
294 del self.shutdowns[cloud_node_id]
297 self._logger.info("Shutting down after signal.")
298 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
299 for bootnode in self.booting.itervalues():
300 bootnode.stop_if_no_cloud_node()
301 self._later.await_shutdown()
303 def await_shutdown(self):
304 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
305 self._timer.schedule(time.time() + 1, self._later.await_shutdown)