3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 def __getitem__(self, key):
30 return self.nodes[key]
33 return len(self.nodes)
35 def get(self, key, default=None):
36 return self.nodes.get(key, default)
38 def record_key(self, record):
39 return self.item_key(getattr(record, self.RECORD_ATTR))
41 def add(self, record):
42 self.nodes[self.record_key(record)] = record
44 def update_record(self, key, item):
45 setattr(self.nodes[key], self.RECORD_ATTR, item)
47 def update_from(self, response):
48 unseen = set(self.nodes.iterkeys())
50 key = self.item_key(item)
53 self.update_record(key, item)
56 self.orphans = {key: self.nodes.pop(key) for key in unseen}
59 return (record for record in self.nodes.itervalues()
60 if getattr(record, self.PAIR_ATTR) is None)
63 class _CloudNodeTracker(_BaseNodeTracker):
64 RECORD_ATTR = 'cloud_node'
65 PAIR_ATTR = 'arvados_node'
66 item_key = staticmethod(lambda cloud_node: cloud_node.id)
69 class _ArvadosNodeTracker(_BaseNodeTracker):
70 RECORD_ATTR = 'arvados_node'
71 PAIR_ATTR = 'cloud_node'
72 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
74 def find_stale_node(self, stale_time):
75 for record in self.nodes.itervalues():
76 node = record.arvados_node
77 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
79 not cnode.timestamp_fresh(record.assignment_time,
85 class NodeManagerDaemonActor(actor_class):
86 """Node Manager daemon.
88 This actor subscribes to all information polls about cloud nodes,
89 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
90 for every cloud node, subscribing them to poll updates
91 appropriately. It creates and destroys cloud nodes based on job queue
92 demand, and stops the corresponding ComputeNode actors when their work
95 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
96 cloud_nodes_actor, cloud_update_actor, timer_actor,
97 arvados_factory, cloud_factory,
98 shutdown_windows, min_nodes, max_nodes,
99 poll_stale_after=600, node_stale_after=7200,
100 node_setup_class=dispatch.ComputeNodeSetupActor,
101 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
102 node_actor_class=dispatch.ComputeNodeMonitorActor):
103 super(NodeManagerDaemonActor, self).__init__()
104 self._node_setup = node_setup_class
105 self._node_shutdown = node_shutdown_class
106 self._node_actor = node_actor_class
107 self._cloud_updater = cloud_update_actor
108 self._timer = timer_actor
109 self._new_arvados = arvados_factory
110 self._new_cloud = cloud_factory
111 self._cloud_driver = self._new_cloud()
112 self._logger = logging.getLogger('arvnodeman.daemon')
113 self._later = self.actor_ref.proxy()
114 self.shutdown_windows = shutdown_windows
115 self.min_nodes = min_nodes
116 self.max_nodes = max_nodes
117 self.poll_stale_after = poll_stale_after
118 self.node_stale_after = node_stale_after
120 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
121 poll_actor = locals()[poll_name + '_actor']
122 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
123 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
124 self.last_polls[poll_name] = -self.poll_stale_after
125 self.cloud_nodes = _CloudNodeTracker()
126 self.arvados_nodes = _ArvadosNodeTracker()
127 self.booting = {} # Actor IDs to ComputeNodeSetupActors
128 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
129 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
130 self._logger.debug("Daemon initialized")
132 def _update_poll_time(self, poll_key):
133 self.last_polls[poll_key] = time.time()
135 def _pair_nodes(self, node_record, arvados_node):
136 self._logger.info("Cloud node %s has associated with Arvados node %s",
137 node_record.cloud_node.id, arvados_node['uuid'])
138 self._arvados_nodes_actor.subscribe_to(
139 arvados_node['uuid'], node_record.actor.update_arvados_node)
140 node_record.arvados_node = arvados_node
141 self.arvados_nodes.add(node_record)
143 def _new_node(self, cloud_node):
144 start_time = self._cloud_driver.node_start_time(cloud_node)
145 shutdown_timer = cnode.ShutdownTimer(start_time,
146 self.shutdown_windows)
147 actor = self._node_actor.start(
148 cloud_node=cloud_node,
149 cloud_node_start_time=start_time,
150 shutdown_timer=shutdown_timer,
151 update_actor=self._cloud_updater,
152 timer_actor=self._timer,
154 poll_stale_after=self.poll_stale_after,
155 node_stale_after=self.node_stale_after).proxy()
156 actor.subscribe(self._later.node_can_shutdown)
157 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
158 actor.update_cloud_node)
159 record = _ComputeNodeRecord(actor, cloud_node)
162 def update_cloud_nodes(self, nodelist):
163 self._update_poll_time('cloud_nodes')
164 for key, node in self.cloud_nodes.update_from(nodelist):
165 self._logger.info("Registering new cloud node %s", key)
166 if key in self.booted:
167 record = self.booted.pop(key)
169 record = self._new_node(node)
170 self.cloud_nodes.add(record)
171 for arv_rec in self.arvados_nodes.unpaired():
172 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
173 self._pair_nodes(record, arv_rec.arvados_node)
175 for key, record in self.cloud_nodes.orphans.iteritems():
177 self.shutdowns.pop(key, None)
179 def update_arvados_nodes(self, nodelist):
180 self._update_poll_time('arvados_nodes')
181 for key, node in self.arvados_nodes.update_from(nodelist):
182 self._logger.info("Registering new Arvados node %s", key)
183 record = _ComputeNodeRecord(arvados_node=node)
184 self.arvados_nodes.add(record)
185 for arv_rec in self.arvados_nodes.unpaired():
186 arv_node = arv_rec.arvados_node
187 for cloud_rec in self.cloud_nodes.unpaired():
188 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
189 self._pair_nodes(cloud_rec, arv_node)
193 return sum(len(nodelist) for nodelist in
194 [self.cloud_nodes, self.booted, self.booting])
196 def _nodes_busy(self):
197 return sum(1 for idle in
198 pykka.get_all(rec.actor.in_state('idle') for rec in
199 self.cloud_nodes.nodes.itervalues())
202 def _nodes_wanted(self):
203 up_count = self._nodes_up()
204 over_max = up_count - self.max_nodes
208 up_count -= len(self.shutdowns) + self._nodes_busy()
209 return len(self.last_wishlist) - up_count
211 def _nodes_excess(self):
212 up_count = self._nodes_up() - len(self.shutdowns)
213 over_min = up_count - self.min_nodes
217 return up_count - self._nodes_busy() - len(self.last_wishlist)
219 def update_server_wishlist(self, wishlist):
220 self._update_poll_time('server_wishlist')
221 self.last_wishlist = wishlist
222 nodes_wanted = self._nodes_wanted()
224 self._later.start_node()
225 elif (nodes_wanted < 0) and self.booting:
226 self._later.stop_booting_node()
228 def _check_poll_freshness(orig_func):
229 """Decorator to inhibit a method when poll information is stale.
231 This decorator checks the timestamps of all the poll information the
232 daemon has received. The decorated method is only called if none
233 of the timestamps are considered stale.
235 @functools.wraps(orig_func)
236 def wrapper(self, *args, **kwargs):
238 if all(now - t < self.poll_stale_after
239 for t in self.last_polls.itervalues()):
240 return orig_func(self, *args, **kwargs)
245 @_check_poll_freshness
246 def start_node(self):
247 nodes_wanted = self._nodes_wanted()
250 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
251 cloud_size = self.last_wishlist[nodes_wanted - 1]
252 self._logger.info("Want %s more nodes. Booting a %s node.",
253 nodes_wanted, cloud_size.name)
254 new_setup = self._node_setup.start(
255 timer_actor=self._timer,
256 arvados_client=self._new_arvados(),
257 arvados_node=arvados_node,
258 cloud_client=self._new_cloud(),
259 cloud_size=cloud_size).proxy()
260 self.booting[new_setup.actor_ref.actor_urn] = new_setup
261 if arvados_node is not None:
262 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
264 new_setup.subscribe(self._later.node_up)
266 self._later.start_node()
268 def _get_actor_attrs(self, actor, *attr_names):
269 return pykka.get_all([getattr(actor, name) for name in attr_names])
271 def node_up(self, setup_proxy):
272 cloud_node, arvados_node = self._get_actor_attrs(
273 setup_proxy, 'cloud_node', 'arvados_node')
274 del self.booting[setup_proxy.actor_ref.actor_urn]
276 record = self.cloud_nodes.get(cloud_node.id)
278 record = self._new_node(cloud_node)
279 self.booted[cloud_node.id] = record
280 self._pair_nodes(record, arvados_node)
282 @_check_poll_freshness
283 def stop_booting_node(self):
284 nodes_excess = self._nodes_excess()
285 if (nodes_excess < 1) or not self.booting:
287 for key, node in self.booting.iteritems():
288 node.stop_if_no_cloud_node().get()
289 if not node.actor_ref.is_alive():
290 del self.booting[key]
292 self._later.stop_booting_node()
295 @_check_poll_freshness
296 def node_can_shutdown(self, node_actor):
297 if self._nodes_excess() < 1:
299 cloud_node_id = node_actor.cloud_node.get().id
300 if cloud_node_id in self.shutdowns:
302 shutdown = self._node_shutdown.start(
303 timer_actor=self._timer, cloud_client=self._new_cloud(),
304 node_monitor=node_actor.actor_ref).proxy()
305 self.shutdowns[cloud_node_id] = shutdown
306 shutdown.subscribe(self._later.node_finished_shutdown)
308 def node_finished_shutdown(self, shutdown_actor):
309 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
311 shutdown_actor.stop()
312 cloud_node_id = cloud_node.id
314 del self.shutdowns[cloud_node_id]
315 elif cloud_node_id in self.booted:
316 self.booted.pop(cloud_node_id).actor.stop()
317 del self.shutdowns[cloud_node_id]
320 self._logger.info("Shutting down after signal.")
321 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
322 for bootnode in self.booting.itervalues():
323 bootnode.stop_if_no_cloud_node()
324 self._later.await_shutdown()
326 def await_shutdown(self):
327 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
328 self._timer.schedule(time.time() + 1, self._later.await_shutdown)