2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
14 from . import computenode as cnode
16 from .computenode import dispatch
17 from .config import actor_class
19 class _ComputeNodeRecord(object):
20 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21 assignment_time=float('-inf')):
23 self.cloud_node = cloud_node
24 self.arvados_node = arvados_node
25 self.assignment_time = assignment_time
26 self.shutdown_actor = None
28 class _BaseNodeTracker(object):
33 # Proxy the methods listed below to self.nodes.
34 def _proxy_method(name):
35 method = getattr(dict, name)
36 @functools.wraps(method, ('__name__', '__doc__'))
37 def wrapper(self, *args, **kwargs):
38 return method(self.nodes, *args, **kwargs)
41 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42 locals()[_method_name] = _proxy_method(_method_name)
44 def record_key(self, record):
45 return self.item_key(getattr(record, self.RECORD_ATTR))
47 def add(self, record):
48 self.nodes[self.record_key(record)] = record
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
59 self.update_record(key, item)
62 self.orphans = {key: self.nodes.pop(key) for key in unseen}
65 return (record for record in self.nodes.itervalues()
66 if getattr(record, self.PAIR_ATTR) is None)
69 class _CloudNodeTracker(_BaseNodeTracker):
70 RECORD_ATTR = 'cloud_node'
71 PAIR_ATTR = 'arvados_node'
72 item_key = staticmethod(lambda cloud_node: cloud_node.id)
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76 RECORD_ATTR = 'arvados_node'
77 PAIR_ATTR = 'cloud_node'
78 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
80 def find_stale_node(self, stale_time):
81 # Try to select a stale node record that have an assigned slot first
82 for record in sorted(self.nodes.itervalues(),
83 key=lambda r: r.arvados_node['slot_number'],
85 node = record.arvados_node
86 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
88 not cnode.timestamp_fresh(record.assignment_time,
94 class NodeManagerDaemonActor(actor_class):
95 """Node Manager daemon.
97 This actor subscribes to all information polls about cloud nodes,
98 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
99 for every cloud node, subscribing them to poll updates
100 appropriately. It creates and destroys cloud nodes based on job queue
101 demand, and stops the corresponding ComputeNode actors when their work
104 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105 cloud_nodes_actor, cloud_update_actor, timer_actor,
106 arvados_factory, cloud_factory,
107 shutdown_windows, server_calculator,
108 min_nodes, max_nodes,
109 poll_stale_after=600,
110 boot_fail_after=1800,
111 node_stale_after=7200,
112 node_setup_class=dispatch.ComputeNodeSetupActor,
113 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114 node_actor_class=dispatch.ComputeNodeMonitorActor,
116 consecutive_idle_count=1):
117 super(NodeManagerDaemonActor, self).__init__()
118 self._node_setup = node_setup_class
119 self._node_shutdown = node_shutdown_class
120 self._node_actor = node_actor_class
121 self._cloud_updater = cloud_update_actor
122 self._timer = timer_actor
123 self._new_arvados = arvados_factory
124 self._new_cloud = cloud_factory
125 self._cloud_driver = self._new_cloud()
126 self._later = self.actor_ref.tell_proxy()
127 self.shutdown_windows = shutdown_windows
128 self.server_calculator = server_calculator
129 self.min_cloud_size = self.server_calculator.cheapest_size()
130 self.min_nodes = min_nodes
131 self.max_nodes = max_nodes
132 self.node_quota = max_nodes
133 self.max_total_price = max_total_price
134 self.poll_stale_after = poll_stale_after
135 self.boot_fail_after = boot_fail_after
136 self.node_stale_after = node_stale_after
137 self.consecutive_idle_count = consecutive_idle_count
139 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
140 poll_actor = locals()[poll_name + '_actor']
141 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
142 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
143 self.last_polls[poll_name] = -self.poll_stale_after
144 self.cloud_nodes = _CloudNodeTracker()
145 self.arvados_nodes = _ArvadosNodeTracker()
146 self.booting = {} # Actor IDs to ComputeNodeSetupActors
147 self.sizes_booting = {} # Actor IDs to node size
150 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
151 self._logger.debug("Daemon started")
153 def _update_poll_time(self, poll_key):
154 self.last_polls[poll_key] = time.time()
156 def _pair_nodes(self, node_record, arvados_node):
157 self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
158 node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
159 self._arvados_nodes_actor.subscribe_to(
160 arvados_node['uuid'], node_record.actor.update_arvados_node)
161 node_record.arvados_node = arvados_node
162 self.arvados_nodes.add(node_record)
164 def _new_node(self, cloud_node):
165 start_time = self._cloud_driver.node_start_time(cloud_node)
166 shutdown_timer = cnode.ShutdownTimer(start_time,
167 self.shutdown_windows)
168 actor = self._node_actor.start(
169 cloud_node=cloud_node,
170 cloud_node_start_time=start_time,
171 shutdown_timer=shutdown_timer,
172 update_actor=self._cloud_updater,
173 timer_actor=self._timer,
175 poll_stale_after=self.poll_stale_after,
176 node_stale_after=self.node_stale_after,
177 cloud_client=self._cloud_driver,
178 boot_fail_after=self.boot_fail_after,
179 consecutive_idle_count=self.consecutive_idle_count)
180 actorTell = actor.tell_proxy()
181 actorTell.subscribe(self._later.node_can_shutdown)
182 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
183 actorTell.update_cloud_node)
184 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
187 def _register_cloud_node(self, node):
188 rec = self.cloud_nodes.get(node.id)
190 self._logger.info("Registering new cloud node %s", node.id)
191 record = self._new_node(node)
192 self.cloud_nodes.add(record)
194 rec.cloud_node = node
196 def update_cloud_nodes(self, nodelist):
197 self._update_poll_time('cloud_nodes')
198 for _, node in self.cloud_nodes.update_from(nodelist):
199 self._register_cloud_node(node)
203 for record in self.cloud_nodes.orphans.itervalues():
204 if record.shutdown_actor:
206 record.shutdown_actor.stop()
207 except pykka.ActorDeadError:
209 record.shutdown_actor = None
211 # A recently booted node is a node that successfully completed the
212 # setup actor but has not yet appeared in the cloud node list.
213 # This will have the tag _nodemanager_recently_booted on it, which
214 # means (if we're not shutting it down) we want to put it back into
215 # the cloud node list. Once it really appears in the cloud list,
216 # the object in record.cloud_node will be replaced by a new one
217 # that lacks the "_nodemanager_recently_booted" tag.
218 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
219 self.cloud_nodes.add(record)
221 # Node disappeared from the cloud node list. If it's paired,
222 # remove its idle time counter.
223 if record.arvados_node:
224 status.tracker.idle_out(record.arvados_node.get('hostname'))
225 # Stop the monitor actor if necessary and forget about the node.
229 except pykka.ActorDeadError:
232 record.cloud_node = None
234 def _register_arvados_node(self, key, arv_node):
235 self._logger.info("Registering new Arvados node %s", key)
236 record = _ComputeNodeRecord(arvados_node=arv_node)
237 self.arvados_nodes.add(record)
239 def update_arvados_nodes(self, nodelist):
240 self._update_poll_time('arvados_nodes')
241 for key, node in self.arvados_nodes.update_from(nodelist):
242 self._register_arvados_node(key, node)
245 def try_pairing(self):
246 for record in self.cloud_nodes.unpaired():
247 for arv_rec in self.arvados_nodes.unpaired():
248 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
249 self._pair_nodes(record, arv_rec.arvados_node)
252 def _nodes_booting(self, size):
254 for c in self.booting.iterkeys()
255 if size is None or self.sizes_booting[c].id == size.id)
258 def _node_states(self, size):
261 for rec in self.cloud_nodes.nodes.itervalues():
262 if size is None or rec.cloud_node.size.id == size.id:
263 if rec.shutdown_actor is None and rec.actor is not None:
264 proxy_states.append(rec.actor.get_state())
266 states.append("shutdown")
267 return states + pykka.get_all(proxy_states)
269 def _update_tracker(self):
272 for k in status.tracker.keys()
273 if k.startswith('nodes_')
275 for s in self._node_states(size=None):
276 updates.setdefault('nodes_'+s, 0)
277 updates['nodes_'+s] += 1
278 updates['nodes_wish'] = len(self.last_wishlist)
279 updates['node_quota'] = self.node_quota
280 status.tracker.update(updates)
282 def _state_counts(self, size):
283 states = self._node_states(size)
285 "booting": self._nodes_booting(size),
294 counts[s] = counts[s] + 1
297 def _nodes_up(self, counts):
298 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
301 def _total_price(self):
303 cost += sum(self.sizes_booting[c].price
304 for c in self.booting.iterkeys())
305 cost += sum(c.cloud_node.size.price
306 for c in self.cloud_nodes.nodes.itervalues())
309 def _size_wishlist(self, size):
310 return sum(1 for c in self.last_wishlist if c.id == size.id)
312 def _nodes_wanted(self, size):
313 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
314 under_min = self.min_nodes - total_node_count
315 over_max = total_node_count - self.node_quota
316 total_price = self._total_price()
318 counts = self._state_counts(size)
320 up_count = self._nodes_up(counts)
321 busy_count = counts["busy"]
322 wishlist_count = self._size_wishlist(size)
324 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id,
331 counts["down"]+counts["fail"],
336 elif under_min > 0 and size.id == self.min_cloud_size.id:
339 wanted = wishlist_count - (up_count - busy_count)
340 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
341 can_boot = int((self.max_total_price - total_price) / size.price)
343 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
344 size.id, size.price, self.max_total_price, total_price)
349 def _nodes_excess(self, size):
350 counts = self._state_counts(size)
351 up_count = self._nodes_up(counts)
352 if size.id == self.min_cloud_size.id:
353 up_count -= self.min_nodes
354 return up_count - (counts["busy"] + self._size_wishlist(size))
356 def update_server_wishlist(self, wishlist):
357 self._update_poll_time('server_wishlist')
358 requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
359 self.last_wishlist = wishlist[:requestable_nodes]
360 for size in reversed(self.server_calculator.cloud_sizes):
362 nodes_wanted = self._nodes_wanted(size)
364 self._later.start_node(size)
365 elif (nodes_wanted < 0) and self.booting:
366 self._later.stop_booting_node(size)
368 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
370 self._update_tracker()
372 self._logger.exception("while updating tracker")
374 def _check_poll_freshness(orig_func):
375 """Decorator to inhibit a method when poll information is stale.
377 This decorator checks the timestamps of all the poll information the
378 daemon has received. The decorated method is only called if none
379 of the timestamps are considered stale.
381 @functools.wraps(orig_func)
382 def wrapper(self, *args, **kwargs):
384 if all(now - t < self.poll_stale_after
385 for t in self.last_polls.itervalues()):
386 return orig_func(self, *args, **kwargs)
391 @_check_poll_freshness
392 def start_node(self, cloud_size):
393 nodes_wanted = self._nodes_wanted(cloud_size)
397 if not self.cancel_node_shutdown(cloud_size):
398 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
399 self._logger.info("Want %i more %s nodes. Booting a node.",
400 nodes_wanted, cloud_size.id)
401 new_setup = self._node_setup.start(
402 timer_actor=self._timer,
403 arvados_client=self._new_arvados(),
404 arvados_node=arvados_node,
405 cloud_client=self._new_cloud(),
406 cloud_size=self.server_calculator.find_size(cloud_size.id))
407 self.booting[new_setup.actor_urn] = new_setup.proxy()
408 self.sizes_booting[new_setup.actor_urn] = cloud_size
410 if arvados_node is not None:
411 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
413 new_setup.tell_proxy().subscribe(self._later.node_setup_finished)
416 self._later.start_node(cloud_size)
418 def _get_actor_attrs(self, actor, *attr_names):
419 return pykka.get_all([getattr(actor, name) for name in attr_names])
421 def node_setup_finished(self, setup_proxy):
422 # Called when a SetupActor has completed.
423 cloud_node, arvados_node, error = self._get_actor_attrs(
424 setup_proxy, 'cloud_node', 'arvados_node', 'error')
427 if cloud_node is None:
428 # If cloud_node is None then the node create wasn't successful.
429 if error == dispatch.QuotaExceeded:
430 # We've hit a quota limit, so adjust node_quota to stop trying to
431 # boot new nodes until the node count goes down.
432 self.node_quota = len(self.cloud_nodes)
433 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
435 # Node creation succeeded. Update cloud node list.
436 cloud_node._nodemanager_recently_booted = True
437 self._register_cloud_node(cloud_node)
439 # Different quota policies may in force depending on the cloud
440 # provider, account limits, and the specific mix of nodes sizes
441 # that are already created. If we are right at the quota limit,
442 # we want to probe to see if the last quota still applies or if we
443 # are allowed to create more nodes.
445 # For example, if the quota is actually based on core count, the
446 # quota might be 20 single-core machines or 10 dual-core machines.
447 # If we previously set node_quota to 10 dual core machines, but are
448 # now booting single core machines (actual quota 20), we want to
449 # allow the quota to expand so we don't get stuck at 10 machines
451 if len(self.cloud_nodes) >= self.node_quota:
452 self.node_quota = len(self.cloud_nodes)+1
453 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
455 self.node_quota = min(self.node_quota, self.max_nodes)
456 del self.booting[setup_proxy.actor_ref.actor_urn]
457 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
459 @_check_poll_freshness
460 def stop_booting_node(self, size):
461 nodes_excess = self._nodes_excess(size)
462 if (nodes_excess < 1) or not self.booting:
464 for key, node in self.booting.iteritems():
466 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2):
467 del self.booting[key]
468 del self.sizes_booting[key]
470 self._later.stop_booting_node(size)
472 except pykka.Timeout:
475 @_check_poll_freshness
476 def cancel_node_shutdown(self, size):
477 # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled
478 for record in self.cloud_nodes.nodes.itervalues():
480 if (record.shutdown_actor is not None and
481 record.cloud_node.size.id == size.id and
482 record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)):
484 except (pykka.ActorDeadError, pykka.Timeout) as e:
488 def _begin_node_shutdown(self, node_actor, cancellable):
489 cloud_node_obj = node_actor.cloud_node.get()
490 cloud_node_id = cloud_node_obj.id
491 record = self.cloud_nodes[cloud_node_id]
492 if record.shutdown_actor is not None:
494 shutdown = self._node_shutdown.start(
495 timer_actor=self._timer, cloud_client=self._new_cloud(),
496 arvados_client=self._new_arvados(),
497 node_monitor=node_actor.actor_ref, cancellable=cancellable)
498 record.shutdown_actor = shutdown.proxy()
499 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
501 @_check_poll_freshness
502 def node_can_shutdown(self, node_actor):
504 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
505 self._begin_node_shutdown(node_actor, cancellable=True)
506 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
507 # Node is unpaired, which means it probably exceeded its booting
508 # grace period without a ping, so shut it down so we can boot a new
510 self._begin_node_shutdown(node_actor, cancellable=False)
511 elif node_actor.in_state('down', 'fail').get():
512 # Node is down and unlikely to come back.
513 self._begin_node_shutdown(node_actor, cancellable=False)
514 except pykka.ActorDeadError as e:
515 # The monitor actor sends shutdown suggestions every time the
516 # node's state is updated, and these go into the daemon actor's
517 # message queue. It's possible that the node has already been shut
518 # down (which shuts down the node monitor actor). In that case,
519 # this message is stale and we'll get ActorDeadError when we try to
520 # access node_actor. Log the error.
521 self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
523 def node_finished_shutdown(self, shutdown_actor):
525 cloud_node, success = self._get_actor_attrs(
526 shutdown_actor, 'cloud_node', 'success')
527 except pykka.ActorDeadError:
529 cloud_node_id = cloud_node.id
532 shutdown_actor.stop()
533 except pykka.ActorDeadError:
537 record = self.cloud_nodes[cloud_node_id]
539 # Cloud node was already removed from the cloud node list
540 # supposedly while the destroy_node call was finishing its
543 record.shutdown_actor = None
548 # Shutdown was successful, so stop the monitor actor, otherwise it
549 # will keep offering the node as a candidate for shutdown.
553 # If the node went from being booted to being shut down without ever
554 # appearing in the cloud node list, it will have the
555 # _nodemanager_recently_booted tag, so get rid of it so that the node
556 # can be forgotten completely.
557 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
558 del record.cloud_node._nodemanager_recently_booted
561 self._logger.info("Shutting down after signal.")
562 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
565 self._server_wishlist_actor.stop()
566 self._arvados_nodes_actor.stop()
567 self._cloud_nodes_actor.stop()
569 # Clear cloud node list
570 self.update_cloud_nodes([])
572 # Stop setup actors unless they are in the middle of setup.
573 setup_stops = {key: node.stop_if_no_cloud_node()
574 for key, node in self.booting.iteritems()}
575 self.booting = {key: self.booting[key]
576 for key in setup_stops if not setup_stops[key].get()}
577 self._later.await_shutdown()
579 def await_shutdown(self):
581 self._timer.schedule(time.time() + 1, self._later.await_shutdown)