3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
13 from .computenode import dispatch
14 from .config import actor_class
16 class _ComputeNodeRecord(object):
17 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
18 assignment_time=float('-inf')):
20 self.cloud_node = cloud_node
21 self.arvados_node = arvados_node
22 self.assignment_time = assignment_time
23 self.shutdown_actor = None
25 class _BaseNodeTracker(object):
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def update_record(self, key, item):
48 setattr(self.nodes[key], self.RECORD_ATTR, item)
50 def update_from(self, response):
51 unseen = set(self.nodes.iterkeys())
53 key = self.item_key(item)
56 self.update_record(key, item)
59 self.orphans = {key: self.nodes.pop(key) for key in unseen}
62 return (record for record in self.nodes.itervalues()
63 if getattr(record, self.PAIR_ATTR) is None)
66 class _CloudNodeTracker(_BaseNodeTracker):
67 RECORD_ATTR = 'cloud_node'
68 PAIR_ATTR = 'arvados_node'
69 item_key = staticmethod(lambda cloud_node: cloud_node.id)
72 class _ArvadosNodeTracker(_BaseNodeTracker):
73 RECORD_ATTR = 'arvados_node'
74 PAIR_ATTR = 'cloud_node'
75 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
77 def find_stale_node(self, stale_time):
78 for record in self.nodes.itervalues():
79 node = record.arvados_node
80 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
82 not cnode.timestamp_fresh(record.assignment_time,
88 class NodeManagerDaemonActor(actor_class):
89 """Node Manager daemon.
91 This actor subscribes to all information polls about cloud nodes,
92 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
93 for every cloud node, subscribing them to poll updates
94 appropriately. It creates and destroys cloud nodes based on job queue
95 demand, and stops the corresponding ComputeNode actors when their work
98 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
99 cloud_nodes_actor, cloud_update_actor, timer_actor,
100 arvados_factory, cloud_factory,
101 shutdown_windows, server_calculator,
102 min_nodes, max_nodes,
103 poll_stale_after=600,
104 boot_fail_after=1800,
105 node_stale_after=7200,
106 node_setup_class=dispatch.ComputeNodeSetupActor,
107 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
108 node_actor_class=dispatch.ComputeNodeMonitorActor,
110 super(NodeManagerDaemonActor, self).__init__()
111 self._node_setup = node_setup_class
112 self._node_shutdown = node_shutdown_class
113 self._node_actor = node_actor_class
114 self._cloud_updater = cloud_update_actor
115 self._timer = timer_actor
116 self._new_arvados = arvados_factory
117 self._new_cloud = cloud_factory
118 self._cloud_driver = self._new_cloud()
119 self._later = self.actor_ref.tell_proxy()
120 self.shutdown_windows = shutdown_windows
121 self.server_calculator = server_calculator
122 self.min_cloud_size = self.server_calculator.cheapest_size()
123 self.min_nodes = min_nodes
124 self.max_nodes = max_nodes
125 self.node_quota = max_nodes
126 self.max_total_price = max_total_price
127 self.poll_stale_after = poll_stale_after
128 self.boot_fail_after = boot_fail_after
129 self.node_stale_after = node_stale_after
131 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
132 poll_actor = locals()[poll_name + '_actor']
133 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
134 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
135 self.last_polls[poll_name] = -self.poll_stale_after
136 self.cloud_nodes = _CloudNodeTracker()
137 self.arvados_nodes = _ArvadosNodeTracker()
138 self.booting = {} # Actor IDs to ComputeNodeSetupActors
139 self.sizes_booting = {} # Actor IDs to node size
142 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
143 self._logger.debug("Daemon started")
145 def _update_poll_time(self, poll_key):
146 self.last_polls[poll_key] = time.time()
148 def _pair_nodes(self, node_record, arvados_node):
149 self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
150 node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
151 self._arvados_nodes_actor.subscribe_to(
152 arvados_node['uuid'], node_record.actor.update_arvados_node)
153 node_record.arvados_node = arvados_node
154 self.arvados_nodes.add(node_record)
156 def _new_node(self, cloud_node):
157 start_time = self._cloud_driver.node_start_time(cloud_node)
158 shutdown_timer = cnode.ShutdownTimer(start_time,
159 self.shutdown_windows)
160 actor = self._node_actor.start(
161 cloud_node=cloud_node,
162 cloud_node_start_time=start_time,
163 shutdown_timer=shutdown_timer,
164 cloud_fqdn_func=self._cloud_driver.node_fqdn,
165 update_actor=self._cloud_updater,
166 timer_actor=self._timer,
168 poll_stale_after=self.poll_stale_after,
169 node_stale_after=self.node_stale_after,
170 cloud_client=self._cloud_driver,
171 boot_fail_after=self.boot_fail_after)
172 actorTell = actor.tell_proxy()
173 actorTell.subscribe(self._later.node_can_shutdown)
174 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
175 actorTell.update_cloud_node)
176 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
179 def _register_cloud_node(self, node):
180 rec = self.cloud_nodes.get(node.id)
182 self._logger.info("Registering new cloud node %s", node.id)
183 record = self._new_node(node)
184 self.cloud_nodes.add(record)
186 rec.cloud_node = node
188 def update_cloud_nodes(self, nodelist):
189 self._update_poll_time('cloud_nodes')
190 for _, node in self.cloud_nodes.update_from(nodelist):
191 self._register_cloud_node(node)
195 for record in self.cloud_nodes.orphans.itervalues():
196 if record.shutdown_actor:
198 record.shutdown_actor.stop()
199 except pykka.ActorDeadError:
201 record.shutdown_actor = None
203 # A recently booted node is a node that successfully completed the
204 # setup actor but has not yet appeared in the cloud node list.
205 # This will have the tag _nodemanager_recently_booted on it, which
206 # means (if we're not shutting it down) we want to put it back into
207 # the cloud node list. Once it really appears in the cloud list,
208 # the object in record.cloud_node will be replaced by a new one
209 # that lacks the "_nodemanager_recently_booted" tag.
210 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
211 self.cloud_nodes.add(record)
213 # Node disappeared from the cloud node list. Stop the monitor
214 # actor if necessary and forget about the node.
218 except pykka.ActorDeadError:
221 record.cloud_node = None
223 def _register_arvados_node(self, key, arv_node):
224 self._logger.info("Registering new Arvados node %s", key)
225 record = _ComputeNodeRecord(arvados_node=arv_node)
226 self.arvados_nodes.add(record)
228 def update_arvados_nodes(self, nodelist):
229 self._update_poll_time('arvados_nodes')
230 for key, node in self.arvados_nodes.update_from(nodelist):
231 self._register_arvados_node(key, node)
234 def try_pairing(self):
235 for record in self.cloud_nodes.unpaired():
236 for arv_rec in self.arvados_nodes.unpaired():
237 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
238 self._pair_nodes(record, arv_rec.arvados_node)
241 def _nodes_booting(self, size):
243 for c in self.booting.iterkeys()
244 if size is None or self.sizes_booting[c].id == size.id)
247 def _node_states(self, size):
250 for rec in self.cloud_nodes.nodes.itervalues():
251 if size is None or rec.cloud_node.size.id == size.id:
252 if rec.shutdown_actor is None and rec.actor is not None:
253 proxy_states.append(rec.actor.get_state())
255 states.append("shutdown")
256 return states + pykka.get_all(proxy_states)
258 def _update_tracker(self):
261 for k in status.tracker.keys()
262 if k.startswith('nodes_')
264 for s in self._node_states(size=None):
265 updates.setdefault('nodes_'+s, 0)
266 updates['nodes_'+s] += 1
267 updates['nodes_wish'] = len(self.last_wishlist)
268 status.tracker.update(updates)
270 def _state_counts(self, size):
271 states = self._node_states(size)
273 "booting": self._nodes_booting(size),
281 counts[s] = counts[s] + 1
284 def _nodes_up(self, counts):
285 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
288 def _total_price(self):
290 cost += sum(self.sizes_booting[c].price
291 for c in self.booting.iterkeys())
292 cost += sum(c.cloud_node.size.price
293 for c in self.cloud_nodes.nodes.itervalues())
296 def _size_wishlist(self, size):
297 return sum(1 for c in self.last_wishlist if c.id == size.id)
299 def _nodes_wanted(self, size):
300 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
301 under_min = self.min_nodes - total_node_count
302 over_max = total_node_count - self.node_quota
303 total_price = self._total_price()
305 counts = self._state_counts(size)
307 up_count = self._nodes_up(counts)
308 busy_count = counts["busy"]
309 wishlist_count = self._size_wishlist(size)
311 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
323 elif under_min > 0 and size.id == self.min_cloud_size.id:
326 wanted = wishlist_count - (up_count - busy_count)
327 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
328 can_boot = int((self.max_total_price - total_price) / size.price)
330 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
331 size.name, size.price, self.max_total_price, total_price)
336 def _nodes_excess(self, size):
337 counts = self._state_counts(size)
338 up_count = self._nodes_up(counts)
339 if size.id == self.min_cloud_size.id:
340 up_count -= self.min_nodes
341 return up_count - (counts["busy"] + self._size_wishlist(size))
343 def update_server_wishlist(self, wishlist):
344 self._update_poll_time('server_wishlist')
345 self.last_wishlist = wishlist
346 for size in reversed(self.server_calculator.cloud_sizes):
348 nodes_wanted = self._nodes_wanted(size)
350 self._later.start_node(size)
351 elif (nodes_wanted < 0) and self.booting:
352 self._later.stop_booting_node(size)
353 except Exception as e:
354 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
356 self._update_tracker()
358 self._logger.exception("while updating tracker")
360 def _check_poll_freshness(orig_func):
361 """Decorator to inhibit a method when poll information is stale.
363 This decorator checks the timestamps of all the poll information the
364 daemon has received. The decorated method is only called if none
365 of the timestamps are considered stale.
367 @functools.wraps(orig_func)
368 def wrapper(self, *args, **kwargs):
370 if all(now - t < self.poll_stale_after
371 for t in self.last_polls.itervalues()):
372 return orig_func(self, *args, **kwargs)
377 @_check_poll_freshness
378 def start_node(self, cloud_size):
379 nodes_wanted = self._nodes_wanted(cloud_size)
382 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
383 self._logger.info("Want %i more %s nodes. Booting a node.",
384 nodes_wanted, cloud_size.name)
385 new_setup = self._node_setup.start(
386 timer_actor=self._timer,
387 arvados_client=self._new_arvados(),
388 arvados_node=arvados_node,
389 cloud_client=self._new_cloud(),
390 cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
391 self.booting[new_setup.actor_ref.actor_urn] = new_setup
392 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
394 if arvados_node is not None:
395 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
397 new_setup.subscribe(self._later.node_setup_finished)
399 self._later.start_node(cloud_size)
401 def _get_actor_attrs(self, actor, *attr_names):
402 return pykka.get_all([getattr(actor, name) for name in attr_names])
404 def node_setup_finished(self, setup_proxy):
405 # Called when a SetupActor has completed.
406 cloud_node, arvados_node, error = self._get_actor_attrs(
407 setup_proxy, 'cloud_node', 'arvados_node', 'error')
410 if cloud_node is None:
411 # If cloud_node is None then the node create wasn't successful.
412 if error == dispatch.QuotaExceeded:
413 # We've hit a quota limit, so adjust node_quota to stop trying to
414 # boot new nodes until the node count goes down.
415 self.node_quota = len(self.cloud_nodes)
416 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
418 # Node creation succeeded. Update cloud node list.
419 cloud_node._nodemanager_recently_booted = True
420 self._register_cloud_node(cloud_node)
422 # Different quota policies may in force depending on the cloud
423 # provider, account limits, and the specific mix of nodes sizes
424 # that are already created. If we are right at the quota limit,
425 # we want to probe to see if the last quota still applies or if we
426 # are allowed to create more nodes.
428 # For example, if the quota is actually based on core count, the
429 # quota might be 20 single-core machines or 10 dual-core machines.
430 # If we previously set node_quota to 10 dual core machines, but are
431 # now booting single core machines (actual quota 20), we want to
432 # allow the quota to expand so we don't get stuck at 10 machines
434 if len(self.cloud_nodes) >= self.node_quota:
435 self.node_quota = len(self.cloud_nodes)+1
436 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
438 self.node_quota = min(self.node_quota, self.max_nodes)
439 del self.booting[setup_proxy.actor_ref.actor_urn]
440 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
442 @_check_poll_freshness
443 def stop_booting_node(self, size):
444 nodes_excess = self._nodes_excess(size)
445 if (nodes_excess < 1) or not self.booting:
447 for key, node in self.booting.iteritems():
448 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
449 del self.booting[key]
450 del self.sizes_booting[key]
453 self._later.stop_booting_node(size)
456 def _begin_node_shutdown(self, node_actor, cancellable):
457 cloud_node_obj = node_actor.cloud_node.get()
458 cloud_node_id = cloud_node_obj.id
459 record = self.cloud_nodes[cloud_node_id]
460 if record.shutdown_actor is not None:
462 shutdown = self._node_shutdown.start(
463 timer_actor=self._timer, cloud_client=self._new_cloud(),
464 arvados_client=self._new_arvados(),
465 node_monitor=node_actor.actor_ref, cancellable=cancellable)
466 record.shutdown_actor = shutdown.proxy()
467 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
469 @_check_poll_freshness
470 def node_can_shutdown(self, node_actor):
472 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
473 self._begin_node_shutdown(node_actor, cancellable=True)
474 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
475 # Node is unpaired, which means it probably exceeded its booting
476 # grace period without a ping, so shut it down so we can boot a new
478 self._begin_node_shutdown(node_actor, cancellable=False)
479 elif node_actor.in_state('down').get():
480 # Node is down and unlikely to come back.
481 self._begin_node_shutdown(node_actor, cancellable=False)
482 except pykka.ActorDeadError as e:
483 # The monitor actor sends shutdown suggestions every time the
484 # node's state is updated, and these go into the daemon actor's
485 # message queue. It's possible that the node has already been shut
486 # down (which shuts down the node monitor actor). In that case,
487 # this message is stale and we'll get ActorDeadError when we try to
488 # access node_actor. Log the error.
489 self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
491 def node_finished_shutdown(self, shutdown_actor):
493 cloud_node, success = self._get_actor_attrs(
494 shutdown_actor, 'cloud_node', 'success')
495 except pykka.ActorDeadError:
497 cloud_node_id = cloud_node.id
498 record = self.cloud_nodes[cloud_node_id]
499 shutdown_actor.stop()
500 record.shutdown_actor = None
505 # Shutdown was successful, so stop the monitor actor, otherwise it
506 # will keep offering the node as a candidate for shutdown.
510 # If the node went from being booted to being shut down without ever
511 # appearing in the cloud node list, it will have the
512 # _nodemanager_recently_booted tag, so get rid of it so that the node
513 # can be forgotten completely.
514 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
515 del record.cloud_node._nodemanager_recently_booted
518 self._logger.info("Shutting down after signal.")
519 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
522 self._server_wishlist_actor.stop()
523 self._arvados_nodes_actor.stop()
524 self._cloud_nodes_actor.stop()
526 # Clear cloud node list
527 self.update_cloud_nodes([])
529 # Stop setup actors unless they are in the middle of setup.
530 setup_stops = {key: node.stop_if_no_cloud_node()
531 for key, node in self.booting.iteritems()}
532 self.booting = {key: self.booting[key]
533 for key in setup_stops if not setup_stops[key].get()}
534 self._later.await_shutdown()
536 def await_shutdown(self):
538 self._timer.schedule(time.time() + 1, self._later.await_shutdown)