3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor):
114 super(NodeManagerDaemonActor, self).__init__()
115 self._node_setup = node_setup_class
116 self._node_shutdown = node_shutdown_class
117 self._node_actor = node_actor_class
118 self._cloud_updater = cloud_update_actor
119 self._timer = timer_actor
120 self._new_arvados = arvados_factory
121 self._new_cloud = cloud_factory
122 self._cloud_driver = self._new_cloud()
123 self._logger = logging.getLogger('arvnodeman.daemon')
124 self._later = self.actor_ref.proxy()
125 self.shutdown_windows = shutdown_windows
126 self.server_calculator = server_calculator
127 self.min_cloud_size = self.server_calculator.cheapest_size()
128 self.min_nodes = min_nodes
129 self.max_nodes = max_nodes
130 self.poll_stale_after = poll_stale_after
131 self.boot_fail_after = boot_fail_after
132 self.node_stale_after = node_stale_after
134 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
135 poll_actor = locals()[poll_name + '_actor']
136 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
137 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
138 self.last_polls[poll_name] = -self.poll_stale_after
139 self.cloud_nodes = _CloudNodeTracker()
140 self.arvados_nodes = _ArvadosNodeTracker()
141 self.booting = {} # Actor IDs to ComputeNodeSetupActors
142 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
143 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
144 self._logger.debug("Daemon initialized")
146 def _update_poll_time(self, poll_key):
147 self.last_polls[poll_key] = time.time()
149 def _pair_nodes(self, node_record, arvados_node):
150 self._logger.info("Cloud node %s has associated with Arvados node %s",
151 node_record.cloud_node.id, arvados_node['uuid'])
152 self._arvados_nodes_actor.subscribe_to(
153 arvados_node['uuid'], node_record.actor.update_arvados_node)
154 node_record.arvados_node = arvados_node
155 self.arvados_nodes.add(node_record)
157 def _new_node(self, cloud_node):
158 start_time = self._cloud_driver.node_start_time(cloud_node)
159 shutdown_timer = cnode.ShutdownTimer(start_time,
160 self.shutdown_windows)
161 actor = self._node_actor.start(
162 cloud_node=cloud_node,
163 cloud_node_start_time=start_time,
164 shutdown_timer=shutdown_timer,
165 cloud_fqdn_func=self._cloud_driver.node_fqdn,
166 update_actor=self._cloud_updater,
167 timer_actor=self._timer,
169 poll_stale_after=self.poll_stale_after,
170 node_stale_after=self.node_stale_after,
171 cloud_client=self._cloud_driver,
172 boot_fail_after=self.boot_fail_after).proxy()
173 actor.subscribe(self._later.node_can_shutdown)
174 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
175 actor.update_cloud_node)
176 record = _ComputeNodeRecord(actor, cloud_node)
179 def update_cloud_nodes(self, nodelist):
180 self._update_poll_time('cloud_nodes')
181 for key, node in self.cloud_nodes.update_from(nodelist):
182 self._logger.info("Registering new cloud node %s", key)
183 if key in self.booted:
184 record = self.booted.pop(key)
186 record = self._new_node(node)
187 self.cloud_nodes.add(record)
188 for arv_rec in self.arvados_nodes.unpaired():
189 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
190 self._pair_nodes(record, arv_rec.arvados_node)
192 for key, record in self.cloud_nodes.orphans.iteritems():
193 if key in self.shutdowns:
195 self.shutdowns[key].stop().get()
196 except pykka.ActorDeadError:
198 del self.shutdowns[key]
200 record.cloud_node = None
202 def update_arvados_nodes(self, nodelist):
203 self._update_poll_time('arvados_nodes')
204 for key, node in self.arvados_nodes.update_from(nodelist):
205 self._logger.info("Registering new Arvados node %s", key)
206 record = _ComputeNodeRecord(arvados_node=node)
207 self.arvados_nodes.add(record)
208 for arv_rec in self.arvados_nodes.unpaired():
209 arv_node = arv_rec.arvados_node
210 for cloud_rec in self.cloud_nodes.unpaired():
211 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
212 self._pair_nodes(cloud_rec, arv_node)
216 return sum(len(nodelist) for nodelist in
217 [self.cloud_nodes, self.booted, self.booting])
219 def _nodes_busy(self):
220 return sum(1 for busy in
221 pykka.get_all(rec.actor.in_state('busy') for rec in
222 self.cloud_nodes.nodes.itervalues())
225 def _nodes_missing(self):
226 return sum(1 for arv_node in
227 pykka.get_all(rec.actor.arvados_node for rec in
228 self.cloud_nodes.nodes.itervalues()
229 if rec.actor.cloud_node.get().id not in self.shutdowns)
230 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
232 def _nodes_wanted(self):
233 up_count = self._nodes_up()
234 under_min = self.min_nodes - up_count
235 over_max = up_count - self.max_nodes
241 up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
242 return len(self.last_wishlist) - up_count
244 def _nodes_excess(self):
245 up_count = self._nodes_up() - len(self.shutdowns)
246 over_min = up_count - self.min_nodes
250 return up_count - self._nodes_busy() - len(self.last_wishlist)
252 def update_server_wishlist(self, wishlist):
253 self._update_poll_time('server_wishlist')
254 self.last_wishlist = wishlist
255 nodes_wanted = self._nodes_wanted()
257 self._later.start_node()
258 elif (nodes_wanted < 0) and self.booting:
259 self._later.stop_booting_node()
261 def _check_poll_freshness(orig_func):
262 """Decorator to inhibit a method when poll information is stale.
264 This decorator checks the timestamps of all the poll information the
265 daemon has received. The decorated method is only called if none
266 of the timestamps are considered stale.
268 @functools.wraps(orig_func)
269 def wrapper(self, *args, **kwargs):
271 if all(now - t < self.poll_stale_after
272 for t in self.last_polls.itervalues()):
273 return orig_func(self, *args, **kwargs)
278 @_check_poll_freshness
279 def start_node(self):
280 nodes_wanted = self._nodes_wanted()
283 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
285 cloud_size = self.last_wishlist[self._nodes_up()]
287 cloud_size = self.min_cloud_size
288 self._logger.info("Want %s more nodes. Booting a %s node.",
289 nodes_wanted, cloud_size.name)
290 new_setup = self._node_setup.start(
291 timer_actor=self._timer,
292 arvados_client=self._new_arvados(),
293 arvados_node=arvados_node,
294 cloud_client=self._new_cloud(),
295 cloud_size=cloud_size).proxy()
296 self.booting[new_setup.actor_ref.actor_urn] = new_setup
297 if arvados_node is not None:
298 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
300 new_setup.subscribe(self._later.node_up)
302 self._later.start_node()
304 def _get_actor_attrs(self, actor, *attr_names):
305 return pykka.get_all([getattr(actor, name) for name in attr_names])
307 def node_up(self, setup_proxy):
308 cloud_node = setup_proxy.cloud_node.get()
309 del self.booting[setup_proxy.actor_ref.actor_urn]
311 record = self.cloud_nodes.get(cloud_node.id)
313 record = self._new_node(cloud_node)
314 self.booted[cloud_node.id] = record
315 self._timer.schedule(time.time() + self.boot_fail_after,
316 self._later.shutdown_unpaired_node, cloud_node.id)
318 @_check_poll_freshness
319 def stop_booting_node(self):
320 nodes_excess = self._nodes_excess()
321 if (nodes_excess < 1) or not self.booting:
323 for key, node in self.booting.iteritems():
324 if node.stop_if_no_cloud_node().get():
325 del self.booting[key]
327 self._later.stop_booting_node()
330 def _begin_node_shutdown(self, node_actor, cancellable):
331 cloud_node_id = node_actor.cloud_node.get().id
332 if cloud_node_id in self.shutdowns:
334 shutdown = self._node_shutdown.start(
335 timer_actor=self._timer, cloud_client=self._new_cloud(),
336 arvados_client=self._new_arvados(),
337 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
338 self.shutdowns[cloud_node_id] = shutdown
339 shutdown.subscribe(self._later.node_finished_shutdown)
341 @_check_poll_freshness
342 def node_can_shutdown(self, node_actor):
343 if self._nodes_excess() > 0:
344 self._begin_node_shutdown(node_actor, cancellable=True)
346 def shutdown_unpaired_node(self, cloud_node_id):
347 for record_dict in [self.cloud_nodes, self.booted]:
348 if cloud_node_id in record_dict:
349 record = record_dict[cloud_node_id]
353 if not record.actor.in_state('idle', 'busy').get():
354 self._begin_node_shutdown(record.actor, cancellable=False)
356 def node_finished_shutdown(self, shutdown_actor):
357 cloud_node, success, cancel_reason = self._get_actor_attrs(
358 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
359 shutdown_actor.stop()
360 cloud_node_id = cloud_node.id
362 if cancel_reason == self._node_shutdown.NODE_BROKEN:
363 self.cloud_nodes.blacklist(cloud_node_id)
364 del self.shutdowns[cloud_node_id]
365 elif cloud_node_id in self.booted:
366 self.booted.pop(cloud_node_id).actor.stop()
367 del self.shutdowns[cloud_node_id]
370 self._logger.info("Shutting down after signal.")
371 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
372 setup_stops = {key: node.stop_if_no_cloud_node()
373 for key, node in self.booting.iteritems()}
374 self.booting = {key: self.booting[key]
375 for key in setup_stops if not setup_stops[key].get()}
376 self._later.await_shutdown()
378 def await_shutdown(self):
380 self._timer.schedule(time.time() + 1, self._later.await_shutdown)