2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
14 from . import computenode as cnode
16 from .computenode import dispatch
17 from .config import actor_class
19 class _ComputeNodeRecord(object):
20 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21 assignment_time=float('-inf')):
23 self.cloud_node = cloud_node
24 self.arvados_node = arvados_node
25 self.assignment_time = assignment_time
26 self.shutdown_actor = None
28 class _BaseNodeTracker(object):
33 # Proxy the methods listed below to self.nodes.
34 def _proxy_method(name):
35 method = getattr(dict, name)
36 @functools.wraps(method, ('__name__', '__doc__'))
37 def wrapper(self, *args, **kwargs):
38 return method(self.nodes, *args, **kwargs)
41 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42 locals()[_method_name] = _proxy_method(_method_name)
44 def record_key(self, record):
45 return self.item_key(getattr(record, self.RECORD_ATTR))
47 def add(self, record):
48 self.nodes[self.record_key(record)] = record
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
59 self.update_record(key, item)
62 self.orphans = {key: self.nodes.pop(key) for key in unseen}
65 return (record for record in self.nodes.itervalues()
66 if getattr(record, self.PAIR_ATTR) is None)
69 class _CloudNodeTracker(_BaseNodeTracker):
70 RECORD_ATTR = 'cloud_node'
71 PAIR_ATTR = 'arvados_node'
72 item_key = staticmethod(lambda cloud_node: cloud_node.id)
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76 RECORD_ATTR = 'arvados_node'
77 PAIR_ATTR = 'cloud_node'
78 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
80 def find_stale_node(self, stale_time):
81 # Try to select a stale node record that have an assigned slot first
82 for record in sorted(self.nodes.itervalues(),
83 key=lambda r: r.arvados_node['slot_number'],
85 node = record.arvados_node
86 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
88 not cnode.timestamp_fresh(record.assignment_time,
94 class NodeManagerDaemonActor(actor_class):
95 """Node Manager daemon.
97 This actor subscribes to all information polls about cloud nodes,
98 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
99 for every cloud node, subscribing them to poll updates
100 appropriately. It creates and destroys cloud nodes based on job queue
101 demand, and stops the corresponding ComputeNode actors when their work
104 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105 cloud_nodes_actor, cloud_update_actor, timer_actor,
106 arvados_factory, cloud_factory,
107 shutdown_windows, server_calculator,
108 min_nodes, max_nodes,
109 poll_stale_after=600,
110 boot_fail_after=1800,
111 node_stale_after=7200,
112 node_setup_class=dispatch.ComputeNodeSetupActor,
113 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114 node_actor_class=dispatch.ComputeNodeMonitorActor,
116 super(NodeManagerDaemonActor, self).__init__()
117 self._node_setup = node_setup_class
118 self._node_shutdown = node_shutdown_class
119 self._node_actor = node_actor_class
120 self._cloud_updater = cloud_update_actor
121 self._timer = timer_actor
122 self._new_arvados = arvados_factory
123 self._new_cloud = cloud_factory
124 self._cloud_driver = self._new_cloud()
125 self._later = self.actor_ref.tell_proxy()
126 self.shutdown_windows = shutdown_windows
127 self.server_calculator = server_calculator
128 self.min_cloud_size = self.server_calculator.cheapest_size()
129 self.min_nodes = min_nodes
130 self.max_nodes = max_nodes
131 self.node_quota = max_nodes
132 self.max_total_price = max_total_price
133 self.poll_stale_after = poll_stale_after
134 self.boot_fail_after = boot_fail_after
135 self.node_stale_after = node_stale_after
137 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
138 poll_actor = locals()[poll_name + '_actor']
139 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
140 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
141 self.last_polls[poll_name] = -self.poll_stale_after
142 self.cloud_nodes = _CloudNodeTracker()
143 self.arvados_nodes = _ArvadosNodeTracker()
144 self.booting = {} # Actor IDs to ComputeNodeSetupActors
145 self.sizes_booting = {} # Actor IDs to node size
148 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149 self._logger.debug("Daemon started")
151 def _update_poll_time(self, poll_key):
152 self.last_polls[poll_key] = time.time()
154 def _pair_nodes(self, node_record, arvados_node):
155 self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
156 node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
157 self._arvados_nodes_actor.subscribe_to(
158 arvados_node['uuid'], node_record.actor.update_arvados_node)
159 node_record.arvados_node = arvados_node
160 self.arvados_nodes.add(node_record)
162 def _new_node(self, cloud_node):
163 start_time = self._cloud_driver.node_start_time(cloud_node)
164 shutdown_timer = cnode.ShutdownTimer(start_time,
165 self.shutdown_windows)
166 actor = self._node_actor.start(
167 cloud_node=cloud_node,
168 cloud_node_start_time=start_time,
169 shutdown_timer=shutdown_timer,
170 update_actor=self._cloud_updater,
171 timer_actor=self._timer,
173 poll_stale_after=self.poll_stale_after,
174 node_stale_after=self.node_stale_after,
175 cloud_client=self._cloud_driver,
176 boot_fail_after=self.boot_fail_after)
177 actorTell = actor.tell_proxy()
178 actorTell.subscribe(self._later.node_can_shutdown)
179 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
180 actorTell.update_cloud_node)
181 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
184 def _register_cloud_node(self, node):
185 rec = self.cloud_nodes.get(node.id)
187 self._logger.info("Registering new cloud node %s", node.id)
188 record = self._new_node(node)
189 self.cloud_nodes.add(record)
191 rec.cloud_node = node
193 def update_cloud_nodes(self, nodelist):
194 self._update_poll_time('cloud_nodes')
195 for _, node in self.cloud_nodes.update_from(nodelist):
196 self._register_cloud_node(node)
200 for record in self.cloud_nodes.orphans.itervalues():
201 if record.shutdown_actor:
203 record.shutdown_actor.stop()
204 except pykka.ActorDeadError:
206 record.shutdown_actor = None
208 # A recently booted node is a node that successfully completed the
209 # setup actor but has not yet appeared in the cloud node list.
210 # This will have the tag _nodemanager_recently_booted on it, which
211 # means (if we're not shutting it down) we want to put it back into
212 # the cloud node list. Once it really appears in the cloud list,
213 # the object in record.cloud_node will be replaced by a new one
214 # that lacks the "_nodemanager_recently_booted" tag.
215 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
216 self.cloud_nodes.add(record)
218 # Node disappeared from the cloud node list. Stop the monitor
219 # actor if necessary and forget about the node.
222 # If it's paired and idle, stop its idle time counter
223 # before removing the monitor actor.
224 if record.actor.get_state().get() == 'idle':
225 status.tracker.idle_out(
226 record.actor.arvados_node.get()['hostname'])
228 except pykka.ActorDeadError:
231 record.cloud_node = None
233 def _register_arvados_node(self, key, arv_node):
234 self._logger.info("Registering new Arvados node %s", key)
235 record = _ComputeNodeRecord(arvados_node=arv_node)
236 self.arvados_nodes.add(record)
238 def update_arvados_nodes(self, nodelist):
239 self._update_poll_time('arvados_nodes')
240 for key, node in self.arvados_nodes.update_from(nodelist):
241 self._register_arvados_node(key, node)
244 def try_pairing(self):
245 for record in self.cloud_nodes.unpaired():
246 for arv_rec in self.arvados_nodes.unpaired():
247 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
248 self._pair_nodes(record, arv_rec.arvados_node)
251 def _nodes_booting(self, size):
253 for c in self.booting.iterkeys()
254 if size is None or self.sizes_booting[c].id == size.id)
257 def _node_states(self, size):
260 for rec in self.cloud_nodes.nodes.itervalues():
261 if size is None or rec.cloud_node.size.id == size.id:
262 if rec.shutdown_actor is None and rec.actor is not None:
263 proxy_states.append(rec.actor.get_state())
265 states.append("shutdown")
266 return states + pykka.get_all(proxy_states)
268 def _update_tracker(self):
271 for k in status.tracker.keys()
272 if k.startswith('nodes_')
274 for s in self._node_states(size=None):
275 updates.setdefault('nodes_'+s, 0)
276 updates['nodes_'+s] += 1
277 updates['nodes_wish'] = len(self.last_wishlist)
278 updates['node_quota'] = self.node_quota
279 status.tracker.update(updates)
281 def _state_counts(self, size):
282 states = self._node_states(size)
284 "booting": self._nodes_booting(size),
293 counts[s] = counts[s] + 1
296 def _nodes_up(self, counts):
297 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
300 def _total_price(self):
302 cost += sum(self.sizes_booting[c].price
303 for c in self.booting.iterkeys())
304 cost += sum(c.cloud_node.size.price
305 for c in self.cloud_nodes.nodes.itervalues())
308 def _size_wishlist(self, size):
309 return sum(1 for c in self.last_wishlist if c.id == size.id)
311 def _nodes_wanted(self, size):
312 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
313 under_min = self.min_nodes - total_node_count
314 over_max = total_node_count - self.node_quota
315 total_price = self._total_price()
317 counts = self._state_counts(size)
319 up_count = self._nodes_up(counts)
320 busy_count = counts["busy"]
321 wishlist_count = self._size_wishlist(size)
323 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
330 counts["down"]+counts["fail"],
335 elif under_min > 0 and size.id == self.min_cloud_size.id:
338 wanted = wishlist_count - (up_count - busy_count)
339 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
340 can_boot = int((self.max_total_price - total_price) / size.price)
342 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
343 size.name, size.price, self.max_total_price, total_price)
348 def _nodes_excess(self, size):
349 counts = self._state_counts(size)
350 up_count = self._nodes_up(counts)
351 if size.id == self.min_cloud_size.id:
352 up_count -= self.min_nodes
353 return up_count - (counts["busy"] + self._size_wishlist(size))
355 def update_server_wishlist(self, wishlist):
356 self._update_poll_time('server_wishlist')
357 requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
358 self.last_wishlist = wishlist[:requestable_nodes]
359 for size in reversed(self.server_calculator.cloud_sizes):
361 nodes_wanted = self._nodes_wanted(size)
363 self._later.start_node(size)
364 elif (nodes_wanted < 0) and self.booting:
365 self._later.stop_booting_node(size)
367 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
369 self._update_tracker()
371 self._logger.exception("while updating tracker")
373 def _check_poll_freshness(orig_func):
374 """Decorator to inhibit a method when poll information is stale.
376 This decorator checks the timestamps of all the poll information the
377 daemon has received. The decorated method is only called if none
378 of the timestamps are considered stale.
380 @functools.wraps(orig_func)
381 def wrapper(self, *args, **kwargs):
383 if all(now - t < self.poll_stale_after
384 for t in self.last_polls.itervalues()):
385 return orig_func(self, *args, **kwargs)
390 @_check_poll_freshness
391 def start_node(self, cloud_size):
392 nodes_wanted = self._nodes_wanted(cloud_size)
395 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
396 self._logger.info("Want %i more %s nodes. Booting a node.",
397 nodes_wanted, cloud_size.name)
398 new_setup = self._node_setup.start(
399 timer_actor=self._timer,
400 arvados_client=self._new_arvados(),
401 arvados_node=arvados_node,
402 cloud_client=self._new_cloud(),
403 cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
404 self.booting[new_setup.actor_ref.actor_urn] = new_setup
405 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
407 if arvados_node is not None:
408 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
410 new_setup.subscribe(self._later.node_setup_finished)
412 self._later.start_node(cloud_size)
414 def _get_actor_attrs(self, actor, *attr_names):
415 return pykka.get_all([getattr(actor, name) for name in attr_names])
417 def node_setup_finished(self, setup_proxy):
418 # Called when a SetupActor has completed.
419 cloud_node, arvados_node, error = self._get_actor_attrs(
420 setup_proxy, 'cloud_node', 'arvados_node', 'error')
423 if cloud_node is None:
424 # If cloud_node is None then the node create wasn't successful.
425 if error == dispatch.QuotaExceeded:
426 # We've hit a quota limit, so adjust node_quota to stop trying to
427 # boot new nodes until the node count goes down.
428 self.node_quota = len(self.cloud_nodes)
429 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
431 # Node creation succeeded. Update cloud node list.
432 cloud_node._nodemanager_recently_booted = True
433 self._register_cloud_node(cloud_node)
435 # Different quota policies may in force depending on the cloud
436 # provider, account limits, and the specific mix of nodes sizes
437 # that are already created. If we are right at the quota limit,
438 # we want to probe to see if the last quota still applies or if we
439 # are allowed to create more nodes.
441 # For example, if the quota is actually based on core count, the
442 # quota might be 20 single-core machines or 10 dual-core machines.
443 # If we previously set node_quota to 10 dual core machines, but are
444 # now booting single core machines (actual quota 20), we want to
445 # allow the quota to expand so we don't get stuck at 10 machines
447 if len(self.cloud_nodes) >= self.node_quota:
448 self.node_quota = len(self.cloud_nodes)+1
449 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
451 self.node_quota = min(self.node_quota, self.max_nodes)
452 del self.booting[setup_proxy.actor_ref.actor_urn]
453 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
455 @_check_poll_freshness
456 def stop_booting_node(self, size):
457 nodes_excess = self._nodes_excess(size)
458 if (nodes_excess < 1) or not self.booting:
460 for key, node in self.booting.iteritems():
461 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
462 del self.booting[key]
463 del self.sizes_booting[key]
466 self._later.stop_booting_node(size)
469 def _begin_node_shutdown(self, node_actor, cancellable):
470 cloud_node_obj = node_actor.cloud_node.get()
471 cloud_node_id = cloud_node_obj.id
472 record = self.cloud_nodes[cloud_node_id]
473 if record.shutdown_actor is not None:
475 shutdown = self._node_shutdown.start(
476 timer_actor=self._timer, cloud_client=self._new_cloud(),
477 arvados_client=self._new_arvados(),
478 node_monitor=node_actor.actor_ref, cancellable=cancellable)
479 record.shutdown_actor = shutdown.proxy()
480 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
482 @_check_poll_freshness
483 def node_can_shutdown(self, node_actor):
485 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
486 self._begin_node_shutdown(node_actor, cancellable=True)
487 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
488 # Node is unpaired, which means it probably exceeded its booting
489 # grace period without a ping, so shut it down so we can boot a new
491 self._begin_node_shutdown(node_actor, cancellable=False)
492 elif node_actor.in_state('down', 'fail').get():
493 # Node is down and unlikely to come back.
494 self._begin_node_shutdown(node_actor, cancellable=False)
495 except pykka.ActorDeadError as e:
496 # The monitor actor sends shutdown suggestions every time the
497 # node's state is updated, and these go into the daemon actor's
498 # message queue. It's possible that the node has already been shut
499 # down (which shuts down the node monitor actor). In that case,
500 # this message is stale and we'll get ActorDeadError when we try to
501 # access node_actor. Log the error.
502 self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
504 def node_finished_shutdown(self, shutdown_actor):
506 cloud_node, success = self._get_actor_attrs(
507 shutdown_actor, 'cloud_node', 'success')
508 except pykka.ActorDeadError:
510 cloud_node_id = cloud_node.id
513 shutdown_actor.stop()
514 except pykka.ActorDeadError:
518 record = self.cloud_nodes[cloud_node_id]
520 # Cloud node was already removed from the cloud node list
521 # supposedly while the destroy_node call was finishing its
524 record.shutdown_actor = None
529 # Shutdown was successful, so stop the monitor actor, otherwise it
530 # will keep offering the node as a candidate for shutdown.
534 # If the node went from being booted to being shut down without ever
535 # appearing in the cloud node list, it will have the
536 # _nodemanager_recently_booted tag, so get rid of it so that the node
537 # can be forgotten completely.
538 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
539 del record.cloud_node._nodemanager_recently_booted
542 self._logger.info("Shutting down after signal.")
543 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
546 self._server_wishlist_actor.stop()
547 self._arvados_nodes_actor.stop()
548 self._cloud_nodes_actor.stop()
550 # Clear cloud node list
551 self.update_cloud_nodes([])
553 # Stop setup actors unless they are in the middle of setup.
554 setup_stops = {key: node.stop_if_no_cloud_node()
555 for key, node in self.booting.iteritems()}
556 self.booting = {key: self.booting[key]
557 for key in setup_stops if not setup_stops[key].get()}
558 self._later.await_shutdown()
560 def await_shutdown(self):
562 self._timer.schedule(time.time() + 1, self._later.await_shutdown)