2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
14 from . import computenode as cnode
16 from .computenode import dispatch
17 from .config import actor_class
19 class _ComputeNodeRecord(object):
20 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21 assignment_time=float('-inf')):
23 self.cloud_node = cloud_node
24 self.arvados_node = arvados_node
25 self.assignment_time = assignment_time
26 self.shutdown_actor = None
28 class _BaseNodeTracker(object):
33 # Proxy the methods listed below to self.nodes.
34 def _proxy_method(name):
35 method = getattr(dict, name)
36 @functools.wraps(method, ('__name__', '__doc__'))
37 def wrapper(self, *args, **kwargs):
38 return method(self.nodes, *args, **kwargs)
41 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42 locals()[_method_name] = _proxy_method(_method_name)
44 def record_key(self, record):
45 return self.item_key(getattr(record, self.RECORD_ATTR))
47 def add(self, record):
48 self.nodes[self.record_key(record)] = record
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
59 self.update_record(key, item)
62 self.orphans = {key: self.nodes.pop(key) for key in unseen}
65 return (record for record in self.nodes.itervalues()
66 if getattr(record, self.PAIR_ATTR) is None)
69 class _CloudNodeTracker(_BaseNodeTracker):
70 RECORD_ATTR = 'cloud_node'
71 PAIR_ATTR = 'arvados_node'
72 item_key = staticmethod(lambda cloud_node: cloud_node.id)
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76 RECORD_ATTR = 'arvados_node'
77 PAIR_ATTR = 'cloud_node'
78 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
80 def find_stale_node(self, stale_time):
81 # Try to select a stale node record that have an assigned slot first
82 for record in sorted(self.nodes.itervalues(),
83 key=lambda r: r.arvados_node['slot_number'],
85 node = record.arvados_node
86 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
88 not cnode.timestamp_fresh(record.assignment_time,
94 class NodeManagerDaemonActor(actor_class):
95 """Node Manager daemon.
97 This actor subscribes to all information polls about cloud nodes,
98 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
99 for every cloud node, subscribing them to poll updates
100 appropriately. It creates and destroys cloud nodes based on job queue
101 demand, and stops the corresponding ComputeNode actors when their work
104 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105 cloud_nodes_actor, cloud_update_actor, timer_actor,
106 arvados_factory, cloud_factory,
107 shutdown_windows, server_calculator,
108 min_nodes, max_nodes,
109 poll_stale_after=600,
110 boot_fail_after=1800,
111 node_stale_after=7200,
112 node_setup_class=dispatch.ComputeNodeSetupActor,
113 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114 node_actor_class=dispatch.ComputeNodeMonitorActor,
116 super(NodeManagerDaemonActor, self).__init__()
117 self._node_setup = node_setup_class
118 self._node_shutdown = node_shutdown_class
119 self._node_actor = node_actor_class
120 self._cloud_updater = cloud_update_actor
121 self._timer = timer_actor
122 self._new_arvados = arvados_factory
123 self._new_cloud = cloud_factory
124 self._cloud_driver = self._new_cloud()
125 self._later = self.actor_ref.tell_proxy()
126 self.shutdown_windows = shutdown_windows
127 self.server_calculator = server_calculator
128 self.min_cloud_size = self.server_calculator.cheapest_size()
129 self.min_nodes = min_nodes
130 self.max_nodes = max_nodes
131 self.node_quota = max_nodes
132 self.max_total_price = max_total_price
133 self.poll_stale_after = poll_stale_after
134 self.boot_fail_after = boot_fail_after
135 self.node_stale_after = node_stale_after
137 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
138 poll_actor = locals()[poll_name + '_actor']
139 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
140 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
141 self.last_polls[poll_name] = -self.poll_stale_after
142 self.cloud_nodes = _CloudNodeTracker()
143 self.arvados_nodes = _ArvadosNodeTracker()
144 self.booting = {} # Actor IDs to ComputeNodeSetupActors
145 self.sizes_booting = {} # Actor IDs to node size
148 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149 self._logger.debug("Daemon started")
151 def _update_poll_time(self, poll_key):
152 self.last_polls[poll_key] = time.time()
154 def _pair_nodes(self, node_record, arvados_node):
155 self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
156 node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
157 self._arvados_nodes_actor.subscribe_to(
158 arvados_node['uuid'], node_record.actor.update_arvados_node)
159 node_record.arvados_node = arvados_node
160 self.arvados_nodes.add(node_record)
162 def _new_node(self, cloud_node):
163 start_time = self._cloud_driver.node_start_time(cloud_node)
164 shutdown_timer = cnode.ShutdownTimer(start_time,
165 self.shutdown_windows)
166 actor = self._node_actor.start(
167 cloud_node=cloud_node,
168 cloud_node_start_time=start_time,
169 shutdown_timer=shutdown_timer,
170 cloud_fqdn_func=self._cloud_driver.node_fqdn,
171 update_actor=self._cloud_updater,
172 timer_actor=self._timer,
174 poll_stale_after=self.poll_stale_after,
175 node_stale_after=self.node_stale_after,
176 cloud_client=self._cloud_driver,
177 boot_fail_after=self.boot_fail_after)
178 actorTell = actor.tell_proxy()
179 actorTell.subscribe(self._later.node_can_shutdown)
180 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
181 actorTell.update_cloud_node)
182 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
185 def _register_cloud_node(self, node):
186 rec = self.cloud_nodes.get(node.id)
188 self._logger.info("Registering new cloud node %s", node.id)
189 record = self._new_node(node)
190 self.cloud_nodes.add(record)
192 rec.cloud_node = node
194 def update_cloud_nodes(self, nodelist):
195 self._update_poll_time('cloud_nodes')
196 for _, node in self.cloud_nodes.update_from(nodelist):
197 self._register_cloud_node(node)
201 for record in self.cloud_nodes.orphans.itervalues():
202 if record.shutdown_actor:
204 record.shutdown_actor.stop()
205 except pykka.ActorDeadError:
207 record.shutdown_actor = None
209 # A recently booted node is a node that successfully completed the
210 # setup actor but has not yet appeared in the cloud node list.
211 # This will have the tag _nodemanager_recently_booted on it, which
212 # means (if we're not shutting it down) we want to put it back into
213 # the cloud node list. Once it really appears in the cloud list,
214 # the object in record.cloud_node will be replaced by a new one
215 # that lacks the "_nodemanager_recently_booted" tag.
216 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
217 self.cloud_nodes.add(record)
219 # Node disappeared from the cloud node list. Stop the monitor
220 # actor if necessary and forget about the node.
224 except pykka.ActorDeadError:
227 record.cloud_node = None
229 def _register_arvados_node(self, key, arv_node):
230 self._logger.info("Registering new Arvados node %s", key)
231 record = _ComputeNodeRecord(arvados_node=arv_node)
232 self.arvados_nodes.add(record)
234 def update_arvados_nodes(self, nodelist):
235 self._update_poll_time('arvados_nodes')
236 for key, node in self.arvados_nodes.update_from(nodelist):
237 self._register_arvados_node(key, node)
240 def try_pairing(self):
241 for record in self.cloud_nodes.unpaired():
242 for arv_rec in self.arvados_nodes.unpaired():
243 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
244 self._pair_nodes(record, arv_rec.arvados_node)
247 def _nodes_booting(self, size):
249 for c in self.booting.iterkeys()
250 if size is None or self.sizes_booting[c].id == size.id)
253 def _node_states(self, size):
256 for rec in self.cloud_nodes.nodes.itervalues():
257 if size is None or rec.cloud_node.size.id == size.id:
258 if rec.shutdown_actor is None and rec.actor is not None:
259 proxy_states.append(rec.actor.get_state())
261 states.append("shutdown")
262 return states + pykka.get_all(proxy_states)
264 def _update_tracker(self):
267 for k in status.tracker.keys()
268 if k.startswith('nodes_')
270 for s in self._node_states(size=None):
271 updates.setdefault('nodes_'+s, 0)
272 updates['nodes_'+s] += 1
273 updates['nodes_wish'] = len(self.last_wishlist)
274 status.tracker.update(updates)
276 def _state_counts(self, size):
277 states = self._node_states(size)
279 "booting": self._nodes_booting(size),
287 counts[s] = counts[s] + 1
290 def _nodes_up(self, counts):
291 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
294 def _total_price(self):
296 cost += sum(self.sizes_booting[c].price
297 for c in self.booting.iterkeys())
298 cost += sum(c.cloud_node.size.price
299 for c in self.cloud_nodes.nodes.itervalues())
302 def _size_wishlist(self, size):
303 return sum(1 for c in self.last_wishlist if c.id == size.id)
305 def _nodes_wanted(self, size):
306 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
307 under_min = self.min_nodes - total_node_count
308 over_max = total_node_count - self.node_quota
309 total_price = self._total_price()
311 counts = self._state_counts(size)
313 up_count = self._nodes_up(counts)
314 busy_count = counts["busy"]
315 wishlist_count = self._size_wishlist(size)
317 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
329 elif under_min > 0 and size.id == self.min_cloud_size.id:
332 wanted = wishlist_count - (up_count - busy_count)
333 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
334 can_boot = int((self.max_total_price - total_price) / size.price)
336 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
337 size.name, size.price, self.max_total_price, total_price)
342 def _nodes_excess(self, size):
343 counts = self._state_counts(size)
344 up_count = self._nodes_up(counts)
345 if size.id == self.min_cloud_size.id:
346 up_count -= self.min_nodes
347 return up_count - (counts["busy"] + self._size_wishlist(size))
349 def update_server_wishlist(self, wishlist):
350 self._update_poll_time('server_wishlist')
351 self.last_wishlist = wishlist
352 for size in reversed(self.server_calculator.cloud_sizes):
354 nodes_wanted = self._nodes_wanted(size)
356 self._later.start_node(size)
357 elif (nodes_wanted < 0) and self.booting:
358 self._later.stop_booting_node(size)
359 except Exception as e:
360 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
362 self._update_tracker()
364 self._logger.exception("while updating tracker")
366 def _check_poll_freshness(orig_func):
367 """Decorator to inhibit a method when poll information is stale.
369 This decorator checks the timestamps of all the poll information the
370 daemon has received. The decorated method is only called if none
371 of the timestamps are considered stale.
373 @functools.wraps(orig_func)
374 def wrapper(self, *args, **kwargs):
376 if all(now - t < self.poll_stale_after
377 for t in self.last_polls.itervalues()):
378 return orig_func(self, *args, **kwargs)
383 @_check_poll_freshness
384 def start_node(self, cloud_size):
385 nodes_wanted = self._nodes_wanted(cloud_size)
388 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
389 self._logger.info("Want %i more %s nodes. Booting a node.",
390 nodes_wanted, cloud_size.name)
391 new_setup = self._node_setup.start(
392 timer_actor=self._timer,
393 arvados_client=self._new_arvados(),
394 arvados_node=arvados_node,
395 cloud_client=self._new_cloud(),
396 cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
397 self.booting[new_setup.actor_ref.actor_urn] = new_setup
398 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
400 if arvados_node is not None:
401 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
403 new_setup.subscribe(self._later.node_setup_finished)
405 self._later.start_node(cloud_size)
407 def _get_actor_attrs(self, actor, *attr_names):
408 return pykka.get_all([getattr(actor, name) for name in attr_names])
410 def node_setup_finished(self, setup_proxy):
411 # Called when a SetupActor has completed.
412 cloud_node, arvados_node, error = self._get_actor_attrs(
413 setup_proxy, 'cloud_node', 'arvados_node', 'error')
416 if cloud_node is None:
417 # If cloud_node is None then the node create wasn't successful.
418 if error == dispatch.QuotaExceeded:
419 # We've hit a quota limit, so adjust node_quota to stop trying to
420 # boot new nodes until the node count goes down.
421 self.node_quota = len(self.cloud_nodes)
422 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
424 # Node creation succeeded. Update cloud node list.
425 cloud_node._nodemanager_recently_booted = True
426 self._register_cloud_node(cloud_node)
428 # Different quota policies may in force depending on the cloud
429 # provider, account limits, and the specific mix of nodes sizes
430 # that are already created. If we are right at the quota limit,
431 # we want to probe to see if the last quota still applies or if we
432 # are allowed to create more nodes.
434 # For example, if the quota is actually based on core count, the
435 # quota might be 20 single-core machines or 10 dual-core machines.
436 # If we previously set node_quota to 10 dual core machines, but are
437 # now booting single core machines (actual quota 20), we want to
438 # allow the quota to expand so we don't get stuck at 10 machines
440 if len(self.cloud_nodes) >= self.node_quota:
441 self.node_quota = len(self.cloud_nodes)+1
442 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
444 self.node_quota = min(self.node_quota, self.max_nodes)
445 del self.booting[setup_proxy.actor_ref.actor_urn]
446 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
448 @_check_poll_freshness
449 def stop_booting_node(self, size):
450 nodes_excess = self._nodes_excess(size)
451 if (nodes_excess < 1) or not self.booting:
453 for key, node in self.booting.iteritems():
454 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
455 del self.booting[key]
456 del self.sizes_booting[key]
459 self._later.stop_booting_node(size)
462 def _begin_node_shutdown(self, node_actor, cancellable):
463 cloud_node_obj = node_actor.cloud_node.get()
464 cloud_node_id = cloud_node_obj.id
465 record = self.cloud_nodes[cloud_node_id]
466 if record.shutdown_actor is not None:
468 shutdown = self._node_shutdown.start(
469 timer_actor=self._timer, cloud_client=self._new_cloud(),
470 arvados_client=self._new_arvados(),
471 node_monitor=node_actor.actor_ref, cancellable=cancellable)
472 record.shutdown_actor = shutdown.proxy()
473 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
475 @_check_poll_freshness
476 def node_can_shutdown(self, node_actor):
478 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
479 self._begin_node_shutdown(node_actor, cancellable=True)
480 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
481 # Node is unpaired, which means it probably exceeded its booting
482 # grace period without a ping, so shut it down so we can boot a new
484 self._begin_node_shutdown(node_actor, cancellable=False)
485 elif node_actor.in_state('down').get():
486 # Node is down and unlikely to come back.
487 self._begin_node_shutdown(node_actor, cancellable=False)
488 except pykka.ActorDeadError as e:
489 # The monitor actor sends shutdown suggestions every time the
490 # node's state is updated, and these go into the daemon actor's
491 # message queue. It's possible that the node has already been shut
492 # down (which shuts down the node monitor actor). In that case,
493 # this message is stale and we'll get ActorDeadError when we try to
494 # access node_actor. Log the error.
495 self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
497 def node_finished_shutdown(self, shutdown_actor):
499 cloud_node, success = self._get_actor_attrs(
500 shutdown_actor, 'cloud_node', 'success')
501 except pykka.ActorDeadError:
503 cloud_node_id = cloud_node.id
506 shutdown_actor.stop()
507 except pykka.ActorDeadError:
511 record = self.cloud_nodes[cloud_node_id]
513 # Cloud node was already removed from the cloud node list
514 # supposedly while the destroy_node call was finishing its
517 record.shutdown_actor = None
522 # Shutdown was successful, so stop the monitor actor, otherwise it
523 # will keep offering the node as a candidate for shutdown.
527 # If the node went from being booted to being shut down without ever
528 # appearing in the cloud node list, it will have the
529 # _nodemanager_recently_booted tag, so get rid of it so that the node
530 # can be forgotten completely.
531 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
532 del record.cloud_node._nodemanager_recently_booted
535 self._logger.info("Shutting down after signal.")
536 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
539 self._server_wishlist_actor.stop()
540 self._arvados_nodes_actor.stop()
541 self._cloud_nodes_actor.stop()
543 # Clear cloud node list
544 self.update_cloud_nodes([])
546 # Stop setup actors unless they are in the middle of setup.
547 setup_stops = {key: node.stop_if_no_cloud_node()
548 for key, node in self.booting.iteritems()}
549 self.booting = {key: self.booting[key]
550 for key in setup_stops if not setup_stops[key].get()}
551 self._later.await_shutdown()
553 def await_shutdown(self):
555 self._timer.schedule(time.time() + 1, self._later.await_shutdown)