3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
22 self.shutdown_actor = None
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, server_calculator,
101 min_nodes, max_nodes,
102 poll_stale_after=600,
103 boot_fail_after=1800,
104 node_stale_after=7200,
105 node_setup_class=dispatch.ComputeNodeSetupActor,
106 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
107 node_actor_class=dispatch.ComputeNodeMonitorActor,
109 super(NodeManagerDaemonActor, self).__init__()
110 self._node_setup = node_setup_class
111 self._node_shutdown = node_shutdown_class
112 self._node_actor = node_actor_class
113 self._cloud_updater = cloud_update_actor
114 self._timer = timer_actor
115 self._new_arvados = arvados_factory
116 self._new_cloud = cloud_factory
117 self._cloud_driver = self._new_cloud()
118 self._later = self.actor_ref.tell_proxy()
119 self.shutdown_windows = shutdown_windows
120 self.server_calculator = server_calculator
121 self.min_cloud_size = self.server_calculator.cheapest_size()
122 self.min_nodes = min_nodes
123 self.max_nodes = max_nodes
124 self.max_total_price = max_total_price
125 self.poll_stale_after = poll_stale_after
126 self.boot_fail_after = boot_fail_after
127 self.node_stale_after = node_stale_after
129 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
130 poll_actor = locals()[poll_name + '_actor']
131 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
132 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
133 self.last_polls[poll_name] = -self.poll_stale_after
134 self.cloud_nodes = _CloudNodeTracker()
135 self.arvados_nodes = _ArvadosNodeTracker()
136 self.booting = {} # Actor IDs to ComputeNodeSetupActors
137 self.sizes_booting = {} # Actor IDs to node size
140 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
141 self._logger.debug("Daemon started")
143 def _update_poll_time(self, poll_key):
144 self.last_polls[poll_key] = time.time()
146 def _pair_nodes(self, node_record, arvados_node):
147 self._logger.info("Cloud node %s is now paired with Arvados node %s",
148 node_record.cloud_node.name, arvados_node['uuid'])
149 self._arvados_nodes_actor.subscribe_to(
150 arvados_node['uuid'], node_record.actor.update_arvados_node)
151 node_record.arvados_node = arvados_node
152 self.arvados_nodes.add(node_record)
154 def _new_node(self, cloud_node):
155 start_time = self._cloud_driver.node_start_time(cloud_node)
156 shutdown_timer = cnode.ShutdownTimer(start_time,
157 self.shutdown_windows)
158 actor = self._node_actor.start(
159 cloud_node=cloud_node,
160 cloud_node_start_time=start_time,
161 shutdown_timer=shutdown_timer,
162 cloud_fqdn_func=self._cloud_driver.node_fqdn,
163 update_actor=self._cloud_updater,
164 timer_actor=self._timer,
166 poll_stale_after=self.poll_stale_after,
167 node_stale_after=self.node_stale_after,
168 cloud_client=self._cloud_driver,
169 boot_fail_after=self.boot_fail_after)
170 actorTell = actor.tell_proxy()
171 actorTell.subscribe(self._later.node_can_shutdown)
172 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
173 actorTell.update_cloud_node)
174 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
177 def _register_cloud_node(self, node):
178 rec = self.cloud_nodes.get(node.id)
180 self._logger.info("Registering new cloud node %s", node.id)
181 record = self._new_node(node)
182 self.cloud_nodes.add(record)
184 rec.cloud_node = node
186 def update_cloud_nodes(self, nodelist):
187 self._update_poll_time('cloud_nodes')
188 for _, node in self.cloud_nodes.update_from(nodelist):
189 self._register_cloud_node(node)
193 for record in self.cloud_nodes.orphans.itervalues():
194 if record.shutdown_actor:
196 record.shutdown_actor.stop()
197 except pykka.ActorDeadError:
199 record.shutdown_actor = None
201 # A recently booted node is a node that successfully completed the
202 # setup actor but has not yet appeared in the cloud node list.
203 # This will have the tag _nodemanager_recently_booted on it, which
204 # means (if we're not shutting it down) we want to put it back into
205 # the cloud node list. Once it really appears in the cloud list,
206 # the object in record.cloud_node will be replaced by a new one
207 # that lacks the "_nodemanager_recently_booted" tag.
208 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
209 self.cloud_nodes.add(record)
211 # Node disappeared from the cloud node list. Stop the monitor
212 # actor if necessary and forget about the node.
216 except pykka.ActorDeadError:
219 record.cloud_node = None
221 def _register_arvados_node(self, key, arv_node):
222 self._logger.info("Registering new Arvados node %s", key)
223 record = _ComputeNodeRecord(arvados_node=arv_node)
224 self.arvados_nodes.add(record)
226 def update_arvados_nodes(self, nodelist):
227 self._update_poll_time('arvados_nodes')
228 for key, node in self.arvados_nodes.update_from(nodelist):
229 self._register_arvados_node(key, node)
232 def try_pairing(self):
233 for record in self.cloud_nodes.unpaired():
234 for arv_rec in self.arvados_nodes.unpaired():
235 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
236 self._pair_nodes(record, arv_rec.arvados_node)
239 def _nodes_booting(self, size):
241 for c in self.booting.iterkeys()
242 if size is None or self.sizes_booting[c].id == size.id)
245 def _node_states(self, size):
246 states = pykka.get_all(rec.actor.get_state()
247 for rec in self.cloud_nodes.nodes.itervalues()
248 if ((size is None or rec.cloud_node.size.id == size.id) and
249 rec.shutdown_actor is None))
250 states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues()
251 if ((size is None or rec.cloud_node.size.id == size.id) and
252 rec.shutdown_actor is not None)]
255 def _state_counts(self, size):
256 states = self._node_states(size)
258 "booting": self._nodes_booting(size),
266 counts[s] = counts[s] + 1
269 def _nodes_up(self, counts):
270 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
273 def _total_price(self):
275 cost += sum(self.sizes_booting[c].price
276 for c in self.booting.iterkeys())
277 cost += sum(c.cloud_node.size.price
278 for c in self.cloud_nodes.nodes.itervalues())
281 def _size_wishlist(self, size):
282 return sum(1 for c in self.last_wishlist if c.id == size.id)
284 def _nodes_wanted(self, size):
285 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
286 under_min = self.min_nodes - total_node_count
287 over_max = total_node_count - self.max_nodes
288 total_price = self._total_price()
290 counts = self._state_counts(size)
292 up_count = self._nodes_up(counts)
293 busy_count = counts["busy"]
295 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
296 self._size_wishlist(size),
307 elif under_min > 0 and size.id == self.min_cloud_size.id:
310 wanted = self._size_wishlist(size) - (up_count - busy_count)
311 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
312 can_boot = int((self.max_total_price - total_price) / size.price)
314 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
315 size.name, size.price, self.max_total_price, total_price)
320 def _nodes_excess(self, size):
321 counts = self._state_counts(size)
322 up_count = self._nodes_up(counts)
323 if size.id == self.min_cloud_size.id:
324 up_count -= self.min_nodes
325 return up_count - (counts["busy"] + self._size_wishlist(size))
327 def update_server_wishlist(self, wishlist):
328 self._update_poll_time('server_wishlist')
329 self.last_wishlist = wishlist
330 for size in reversed(self.server_calculator.cloud_sizes):
332 nodes_wanted = self._nodes_wanted(size)
334 self._later.start_node(size)
335 elif (nodes_wanted < 0) and self.booting:
336 self._later.stop_booting_node(size)
337 except Exception as e:
338 self._logger.exception("while calculating nodes wanted for size %s", size)
340 def _check_poll_freshness(orig_func):
341 """Decorator to inhibit a method when poll information is stale.
343 This decorator checks the timestamps of all the poll information the
344 daemon has received. The decorated method is only called if none
345 of the timestamps are considered stale.
347 @functools.wraps(orig_func)
348 def wrapper(self, *args, **kwargs):
350 if all(now - t < self.poll_stale_after
351 for t in self.last_polls.itervalues()):
352 return orig_func(self, *args, **kwargs)
357 @_check_poll_freshness
358 def start_node(self, cloud_size):
359 nodes_wanted = self._nodes_wanted(cloud_size)
362 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
363 self._logger.info("Want %i more %s nodes. Booting a node.",
364 nodes_wanted, cloud_size.name)
365 new_setup = self._node_setup.start(
366 timer_actor=self._timer,
367 arvados_client=self._new_arvados(),
368 arvados_node=arvados_node,
369 cloud_client=self._new_cloud(),
370 cloud_size=cloud_size).proxy()
371 self.booting[new_setup.actor_ref.actor_urn] = new_setup
372 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
374 if arvados_node is not None:
375 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
377 new_setup.subscribe(self._later.node_up)
379 self._later.start_node(cloud_size)
381 def _get_actor_attrs(self, actor, *attr_names):
382 return pykka.get_all([getattr(actor, name) for name in attr_names])
384 def node_up(self, setup_proxy):
385 # Called when a SetupActor has completed.
386 cloud_node, arvados_node = self._get_actor_attrs(
387 setup_proxy, 'cloud_node', 'arvados_node')
390 # If cloud_node is None then the node create wasn't
391 # successful and so there isn't anything to do.
392 if cloud_node is not None:
393 # Node creation succeeded. Update cloud node list.
394 cloud_node._nodemanager_recently_booted = True
395 self._register_cloud_node(cloud_node)
396 del self.booting[setup_proxy.actor_ref.actor_urn]
397 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
399 @_check_poll_freshness
400 def stop_booting_node(self, size):
401 nodes_excess = self._nodes_excess(size)
402 if (nodes_excess < 1) or not self.booting:
404 for key, node in self.booting.iteritems():
405 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
406 del self.booting[key]
407 del self.sizes_booting[key]
410 self._later.stop_booting_node(size)
413 def _begin_node_shutdown(self, node_actor, cancellable):
414 cloud_node_obj = node_actor.cloud_node.get()
415 cloud_node_id = cloud_node_obj.id
416 record = self.cloud_nodes[cloud_node_id]
417 if record.shutdown_actor is not None:
419 shutdown = self._node_shutdown.start(
420 timer_actor=self._timer, cloud_client=self._new_cloud(),
421 arvados_client=self._new_arvados(),
422 node_monitor=node_actor.actor_ref, cancellable=cancellable)
423 record.shutdown_actor = shutdown.proxy()
424 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
426 @_check_poll_freshness
427 def node_can_shutdown(self, node_actor):
428 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
429 self._begin_node_shutdown(node_actor, cancellable=True)
430 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
431 # Node is unpaired, which means it probably exceeded its booting
432 # grace period without a ping, so shut it down so we can boot a new
434 self._begin_node_shutdown(node_actor, cancellable=False)
435 elif node_actor.in_state('down').get():
436 # Node is down and unlikely to come back.
437 self._begin_node_shutdown(node_actor, cancellable=False)
439 def node_finished_shutdown(self, shutdown_actor):
441 cloud_node, success = self._get_actor_attrs(
442 shutdown_actor, 'cloud_node', 'success')
443 except pykka.ActorDeadError:
445 cloud_node_id = cloud_node.id
446 record = self.cloud_nodes[cloud_node_id]
447 shutdown_actor.stop()
448 record.shutdown_actor = None
453 # Shutdown was successful, so stop the monitor actor, otherwise it
454 # will keep offering the node as a candidate for shutdown.
458 # If the node went from being booted to being shut down without ever
459 # appearing in the cloud node list, it will have the
460 # _nodemanager_recently_booted tag, so get rid of it so that the node
461 # can be forgotten completely.
462 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
463 del record.cloud_node._nodemanager_recently_booted
466 self._logger.info("Shutting down after signal.")
467 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
468 setup_stops = {key: node.stop_if_no_cloud_node()
469 for key, node in self.booting.iteritems()}
470 self.booting = {key: self.booting[key]
471 for key in setup_stops if not setup_stops[key].get()}
472 self._later.await_shutdown()
474 def await_shutdown(self):
476 self._timer.schedule(time.time() + 1, self._later.await_shutdown)