2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
14 from . import computenode as cnode
16 from .computenode import dispatch
17 from .config import actor_class
19 class _ComputeNodeRecord(object):
20 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21 assignment_time=float('-inf')):
23 self.cloud_node = cloud_node
24 self.arvados_node = arvados_node
25 self.assignment_time = assignment_time
26 self.shutdown_actor = None
28 class _BaseNodeTracker(object):
33 # Proxy the methods listed below to self.nodes.
34 def _proxy_method(name):
35 method = getattr(dict, name)
36 @functools.wraps(method, ('__name__', '__doc__'))
37 def wrapper(self, *args, **kwargs):
38 return method(self.nodes, *args, **kwargs)
41 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42 locals()[_method_name] = _proxy_method(_method_name)
44 def record_key(self, record):
45 return self.item_key(getattr(record, self.RECORD_ATTR))
47 def add(self, record):
48 self.nodes[self.record_key(record)] = record
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
59 self.update_record(key, item)
62 self.orphans = {key: self.nodes.pop(key) for key in unseen}
65 return (record for record in self.nodes.itervalues()
66 if getattr(record, self.PAIR_ATTR) is None)
69 class _CloudNodeTracker(_BaseNodeTracker):
70 RECORD_ATTR = 'cloud_node'
71 PAIR_ATTR = 'arvados_node'
72 item_key = staticmethod(lambda cloud_node: cloud_node.id)
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76 RECORD_ATTR = 'arvados_node'
77 PAIR_ATTR = 'cloud_node'
78 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
80 def find_stale_node(self, stale_time):
81 for record in self.nodes.itervalues():
82 node = record.arvados_node
83 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
85 not cnode.timestamp_fresh(record.assignment_time,
91 class NodeManagerDaemonActor(actor_class):
92 """Node Manager daemon.
94 This actor subscribes to all information polls about cloud nodes,
95 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
96 for every cloud node, subscribing them to poll updates
97 appropriately. It creates and destroys cloud nodes based on job queue
98 demand, and stops the corresponding ComputeNode actors when their work
101 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
102 cloud_nodes_actor, cloud_update_actor, timer_actor,
103 arvados_factory, cloud_factory,
104 shutdown_windows, server_calculator,
105 min_nodes, max_nodes,
106 poll_stale_after=600,
107 boot_fail_after=1800,
108 node_stale_after=7200,
109 node_setup_class=dispatch.ComputeNodeSetupActor,
110 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
111 node_actor_class=dispatch.ComputeNodeMonitorActor,
113 super(NodeManagerDaemonActor, self).__init__()
114 self._node_setup = node_setup_class
115 self._node_shutdown = node_shutdown_class
116 self._node_actor = node_actor_class
117 self._cloud_updater = cloud_update_actor
118 self._timer = timer_actor
119 self._new_arvados = arvados_factory
120 self._new_cloud = cloud_factory
121 self._cloud_driver = self._new_cloud()
122 self._later = self.actor_ref.tell_proxy()
123 self.shutdown_windows = shutdown_windows
124 self.server_calculator = server_calculator
125 self.min_cloud_size = self.server_calculator.cheapest_size()
126 self.min_nodes = min_nodes
127 self.max_nodes = max_nodes
128 self.node_quota = max_nodes
129 self.max_total_price = max_total_price
130 self.poll_stale_after = poll_stale_after
131 self.boot_fail_after = boot_fail_after
132 self.node_stale_after = node_stale_after
134 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
135 poll_actor = locals()[poll_name + '_actor']
136 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
137 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
138 self.last_polls[poll_name] = -self.poll_stale_after
139 self.cloud_nodes = _CloudNodeTracker()
140 self.arvados_nodes = _ArvadosNodeTracker()
141 self.booting = {} # Actor IDs to ComputeNodeSetupActors
142 self.sizes_booting = {} # Actor IDs to node size
145 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
146 self._logger.debug("Daemon started")
148 def _update_poll_time(self, poll_key):
149 self.last_polls[poll_key] = time.time()
151 def _pair_nodes(self, node_record, arvados_node):
152 self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
153 node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
154 self._arvados_nodes_actor.subscribe_to(
155 arvados_node['uuid'], node_record.actor.update_arvados_node)
156 node_record.arvados_node = arvados_node
157 self.arvados_nodes.add(node_record)
159 def _new_node(self, cloud_node):
160 start_time = self._cloud_driver.node_start_time(cloud_node)
161 shutdown_timer = cnode.ShutdownTimer(start_time,
162 self.shutdown_windows)
163 actor = self._node_actor.start(
164 cloud_node=cloud_node,
165 cloud_node_start_time=start_time,
166 shutdown_timer=shutdown_timer,
167 cloud_fqdn_func=self._cloud_driver.node_fqdn,
168 update_actor=self._cloud_updater,
169 timer_actor=self._timer,
171 poll_stale_after=self.poll_stale_after,
172 node_stale_after=self.node_stale_after,
173 cloud_client=self._cloud_driver,
174 boot_fail_after=self.boot_fail_after)
175 actorTell = actor.tell_proxy()
176 actorTell.subscribe(self._later.node_can_shutdown)
177 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
178 actorTell.update_cloud_node)
179 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
182 def _register_cloud_node(self, node):
183 rec = self.cloud_nodes.get(node.id)
185 self._logger.info("Registering new cloud node %s", node.id)
186 record = self._new_node(node)
187 self.cloud_nodes.add(record)
189 rec.cloud_node = node
191 def update_cloud_nodes(self, nodelist):
192 self._update_poll_time('cloud_nodes')
193 for _, node in self.cloud_nodes.update_from(nodelist):
194 self._register_cloud_node(node)
198 for record in self.cloud_nodes.orphans.itervalues():
199 if record.shutdown_actor:
201 record.shutdown_actor.stop()
202 except pykka.ActorDeadError:
204 record.shutdown_actor = None
206 # A recently booted node is a node that successfully completed the
207 # setup actor but has not yet appeared in the cloud node list.
208 # This will have the tag _nodemanager_recently_booted on it, which
209 # means (if we're not shutting it down) we want to put it back into
210 # the cloud node list. Once it really appears in the cloud list,
211 # the object in record.cloud_node will be replaced by a new one
212 # that lacks the "_nodemanager_recently_booted" tag.
213 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
214 self.cloud_nodes.add(record)
216 # Node disappeared from the cloud node list. Stop the monitor
217 # actor if necessary and forget about the node.
221 except pykka.ActorDeadError:
224 record.cloud_node = None
226 def _register_arvados_node(self, key, arv_node):
227 self._logger.info("Registering new Arvados node %s", key)
228 record = _ComputeNodeRecord(arvados_node=arv_node)
229 self.arvados_nodes.add(record)
231 def update_arvados_nodes(self, nodelist):
232 self._update_poll_time('arvados_nodes')
233 for key, node in self.arvados_nodes.update_from(nodelist):
234 self._register_arvados_node(key, node)
237 def try_pairing(self):
238 for record in self.cloud_nodes.unpaired():
239 for arv_rec in self.arvados_nodes.unpaired():
240 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
241 self._pair_nodes(record, arv_rec.arvados_node)
244 def _nodes_booting(self, size):
246 for c in self.booting.iterkeys()
247 if size is None or self.sizes_booting[c].id == size.id)
250 def _node_states(self, size):
253 for rec in self.cloud_nodes.nodes.itervalues():
254 if size is None or rec.cloud_node.size.id == size.id:
255 if rec.shutdown_actor is None and rec.actor is not None:
256 proxy_states.append(rec.actor.get_state())
258 states.append("shutdown")
259 return states + pykka.get_all(proxy_states)
261 def _update_tracker(self):
264 for k in status.tracker.keys()
265 if k.startswith('nodes_')
267 for s in self._node_states(size=None):
268 updates.setdefault('nodes_'+s, 0)
269 updates['nodes_'+s] += 1
270 updates['nodes_wish'] = len(self.last_wishlist)
271 status.tracker.update(updates)
273 def _state_counts(self, size):
274 states = self._node_states(size)
276 "booting": self._nodes_booting(size),
284 counts[s] = counts[s] + 1
287 def _nodes_up(self, counts):
288 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
291 def _total_price(self):
293 cost += sum(self.sizes_booting[c].price
294 for c in self.booting.iterkeys())
295 cost += sum(c.cloud_node.size.price
296 for c in self.cloud_nodes.nodes.itervalues())
299 def _size_wishlist(self, size):
300 return sum(1 for c in self.last_wishlist if c.id == size.id)
302 def _nodes_wanted(self, size):
303 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
304 under_min = self.min_nodes - total_node_count
305 over_max = total_node_count - self.node_quota
306 total_price = self._total_price()
308 counts = self._state_counts(size)
310 up_count = self._nodes_up(counts)
311 busy_count = counts["busy"]
312 wishlist_count = self._size_wishlist(size)
314 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
326 elif under_min > 0 and size.id == self.min_cloud_size.id:
329 wanted = wishlist_count - (up_count - busy_count)
330 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
331 can_boot = int((self.max_total_price - total_price) / size.price)
333 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
334 size.name, size.price, self.max_total_price, total_price)
339 def _nodes_excess(self, size):
340 counts = self._state_counts(size)
341 up_count = self._nodes_up(counts)
342 if size.id == self.min_cloud_size.id:
343 up_count -= self.min_nodes
344 return up_count - (counts["busy"] + self._size_wishlist(size))
346 def update_server_wishlist(self, wishlist):
347 self._update_poll_time('server_wishlist')
348 self.last_wishlist = wishlist
349 for size in reversed(self.server_calculator.cloud_sizes):
351 nodes_wanted = self._nodes_wanted(size)
353 self._later.start_node(size)
354 elif (nodes_wanted < 0) and self.booting:
355 self._later.stop_booting_node(size)
356 except Exception as e:
357 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
359 self._update_tracker()
361 self._logger.exception("while updating tracker")
363 def _check_poll_freshness(orig_func):
364 """Decorator to inhibit a method when poll information is stale.
366 This decorator checks the timestamps of all the poll information the
367 daemon has received. The decorated method is only called if none
368 of the timestamps are considered stale.
370 @functools.wraps(orig_func)
371 def wrapper(self, *args, **kwargs):
373 if all(now - t < self.poll_stale_after
374 for t in self.last_polls.itervalues()):
375 return orig_func(self, *args, **kwargs)
380 @_check_poll_freshness
381 def start_node(self, cloud_size):
382 nodes_wanted = self._nodes_wanted(cloud_size)
385 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
386 self._logger.info("Want %i more %s nodes. Booting a node.",
387 nodes_wanted, cloud_size.name)
388 new_setup = self._node_setup.start(
389 timer_actor=self._timer,
390 arvados_client=self._new_arvados(),
391 arvados_node=arvados_node,
392 cloud_client=self._new_cloud(),
393 cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
394 self.booting[new_setup.actor_ref.actor_urn] = new_setup
395 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
397 if arvados_node is not None:
398 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
400 new_setup.subscribe(self._later.node_setup_finished)
402 self._later.start_node(cloud_size)
404 def _get_actor_attrs(self, actor, *attr_names):
405 return pykka.get_all([getattr(actor, name) for name in attr_names])
407 def node_setup_finished(self, setup_proxy):
408 # Called when a SetupActor has completed.
409 cloud_node, arvados_node, error = self._get_actor_attrs(
410 setup_proxy, 'cloud_node', 'arvados_node', 'error')
413 if cloud_node is None:
414 # If cloud_node is None then the node create wasn't successful.
415 if error == dispatch.QuotaExceeded:
416 # We've hit a quota limit, so adjust node_quota to stop trying to
417 # boot new nodes until the node count goes down.
418 self.node_quota = len(self.cloud_nodes)
419 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
421 # Node creation succeeded. Update cloud node list.
422 cloud_node._nodemanager_recently_booted = True
423 self._register_cloud_node(cloud_node)
425 # Different quota policies may in force depending on the cloud
426 # provider, account limits, and the specific mix of nodes sizes
427 # that are already created. If we are right at the quota limit,
428 # we want to probe to see if the last quota still applies or if we
429 # are allowed to create more nodes.
431 # For example, if the quota is actually based on core count, the
432 # quota might be 20 single-core machines or 10 dual-core machines.
433 # If we previously set node_quota to 10 dual core machines, but are
434 # now booting single core machines (actual quota 20), we want to
435 # allow the quota to expand so we don't get stuck at 10 machines
437 if len(self.cloud_nodes) >= self.node_quota:
438 self.node_quota = len(self.cloud_nodes)+1
439 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
441 self.node_quota = min(self.node_quota, self.max_nodes)
442 del self.booting[setup_proxy.actor_ref.actor_urn]
443 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
445 @_check_poll_freshness
446 def stop_booting_node(self, size):
447 nodes_excess = self._nodes_excess(size)
448 if (nodes_excess < 1) or not self.booting:
450 for key, node in self.booting.iteritems():
451 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
452 del self.booting[key]
453 del self.sizes_booting[key]
456 self._later.stop_booting_node(size)
459 def _begin_node_shutdown(self, node_actor, cancellable):
460 cloud_node_obj = node_actor.cloud_node.get()
461 cloud_node_id = cloud_node_obj.id
462 record = self.cloud_nodes[cloud_node_id]
463 if record.shutdown_actor is not None:
465 shutdown = self._node_shutdown.start(
466 timer_actor=self._timer, cloud_client=self._new_cloud(),
467 arvados_client=self._new_arvados(),
468 node_monitor=node_actor.actor_ref, cancellable=cancellable)
469 record.shutdown_actor = shutdown.proxy()
470 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
472 @_check_poll_freshness
473 def node_can_shutdown(self, node_actor):
475 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
476 self._begin_node_shutdown(node_actor, cancellable=True)
477 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
478 # Node is unpaired, which means it probably exceeded its booting
479 # grace period without a ping, so shut it down so we can boot a new
481 self._begin_node_shutdown(node_actor, cancellable=False)
482 elif node_actor.in_state('down').get():
483 # Node is down and unlikely to come back.
484 self._begin_node_shutdown(node_actor, cancellable=False)
485 except pykka.ActorDeadError as e:
486 # The monitor actor sends shutdown suggestions every time the
487 # node's state is updated, and these go into the daemon actor's
488 # message queue. It's possible that the node has already been shut
489 # down (which shuts down the node monitor actor). In that case,
490 # this message is stale and we'll get ActorDeadError when we try to
491 # access node_actor. Log the error.
492 self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
494 def node_finished_shutdown(self, shutdown_actor):
496 cloud_node, success = self._get_actor_attrs(
497 shutdown_actor, 'cloud_node', 'success')
498 except pykka.ActorDeadError:
500 cloud_node_id = cloud_node.id
501 record = self.cloud_nodes[cloud_node_id]
502 shutdown_actor.stop()
503 record.shutdown_actor = None
508 # Shutdown was successful, so stop the monitor actor, otherwise it
509 # will keep offering the node as a candidate for shutdown.
513 # If the node went from being booted to being shut down without ever
514 # appearing in the cloud node list, it will have the
515 # _nodemanager_recently_booted tag, so get rid of it so that the node
516 # can be forgotten completely.
517 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
518 del record.cloud_node._nodemanager_recently_booted
521 self._logger.info("Shutting down after signal.")
522 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
525 self._server_wishlist_actor.stop()
526 self._arvados_nodes_actor.stop()
527 self._cloud_nodes_actor.stop()
529 # Clear cloud node list
530 self.update_cloud_nodes([])
532 # Stop setup actors unless they are in the middle of setup.
533 setup_stops = {key: node.stop_if_no_cloud_node()
534 for key, node in self.booting.iteritems()}
535 self.booting = {key: self.booting[key]
536 for key in setup_stops if not setup_stops[key].get()}
537 self._later.await_shutdown()
539 def await_shutdown(self):
541 self._timer.schedule(time.time() + 1, self._later.await_shutdown)