3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
22 self.shutdown_actor = None
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, server_calculator,
101 min_nodes, max_nodes,
102 poll_stale_after=600,
103 boot_fail_after=1800,
104 node_stale_after=7200,
105 node_setup_class=dispatch.ComputeNodeSetupActor,
106 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
107 node_actor_class=dispatch.ComputeNodeMonitorActor,
109 super(NodeManagerDaemonActor, self).__init__()
110 self._node_setup = node_setup_class
111 self._node_shutdown = node_shutdown_class
112 self._node_actor = node_actor_class
113 self._cloud_updater = cloud_update_actor
114 self._timer = timer_actor
115 self._new_arvados = arvados_factory
116 self._new_cloud = cloud_factory
117 self._cloud_driver = self._new_cloud()
118 self._later = self.actor_ref.tell_proxy()
119 self.shutdown_windows = shutdown_windows
120 self.server_calculator = server_calculator
121 self.min_cloud_size = self.server_calculator.cheapest_size()
122 self.min_nodes = min_nodes
123 self.max_nodes = max_nodes
124 self.max_total_price = max_total_price
125 self.poll_stale_after = poll_stale_after
126 self.boot_fail_after = boot_fail_after
127 self.node_stale_after = node_stale_after
129 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
130 poll_actor = locals()[poll_name + '_actor']
131 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
132 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
133 self.last_polls[poll_name] = -self.poll_stale_after
134 self.cloud_nodes = _CloudNodeTracker()
135 self.arvados_nodes = _ArvadosNodeTracker()
136 self.booting = {} # Actor IDs to ComputeNodeSetupActors
137 self.sizes_booting = {} # Actor IDs to node size
140 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
141 self._logger.debug("Daemon started")
143 def _update_poll_time(self, poll_key):
144 self.last_polls[poll_key] = time.time()
146 def _pair_nodes(self, node_record, arvados_node):
147 self._logger.info("Cloud node %s is now paired with Arvados node %s",
148 node_record.cloud_node.name, arvados_node['uuid'])
149 self._arvados_nodes_actor.subscribe_to(
150 arvados_node['uuid'], node_record.actor.update_arvados_node)
151 node_record.arvados_node = arvados_node
152 self.arvados_nodes.add(node_record)
154 def _new_node(self, cloud_node):
155 start_time = self._cloud_driver.node_start_time(cloud_node)
156 shutdown_timer = cnode.ShutdownTimer(start_time,
157 self.shutdown_windows)
158 actor = self._node_actor.start(
159 cloud_node=cloud_node,
160 cloud_node_start_time=start_time,
161 shutdown_timer=shutdown_timer,
162 cloud_fqdn_func=self._cloud_driver.node_fqdn,
163 update_actor=self._cloud_updater,
164 timer_actor=self._timer,
166 poll_stale_after=self.poll_stale_after,
167 node_stale_after=self.node_stale_after,
168 cloud_client=self._cloud_driver,
169 boot_fail_after=self.boot_fail_after)
170 actorTell = actor.tell_proxy()
171 actorTell.subscribe(self._later.node_can_shutdown)
172 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
173 actorTell.update_cloud_node)
174 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
177 def _register_cloud_node(self, node):
178 rec = self.cloud_nodes.get(node.id)
180 self._logger.info("Registering new cloud node %s", node.id)
181 record = self._new_node(node)
182 self.cloud_nodes.add(record)
184 rec.cloud_node = node
186 def update_cloud_nodes(self, nodelist):
187 self._update_poll_time('cloud_nodes')
188 for _, node in self.cloud_nodes.update_from(nodelist):
189 self._register_cloud_node(node)
193 for record in self.cloud_nodes.orphans.itervalues():
194 if record.shutdown_actor:
196 record.shutdown_actor.stop()
197 except pykka.ActorDeadError:
199 record.shutdown_actor = None
201 # A recently booted node is a node that successfully completed the
202 # setup actor but has not yet appeared in the cloud node list.
203 # This will have the tag _nodemanager_recently_booted on it, which
204 # means (if we're not shutting it down) we want to put it back into
205 # the cloud node list. Once it really appears in the cloud list,
206 # the object in record.cloud_node will be replaced by a new one
207 # that lacks the "_nodemanager_recently_booted" tag.
208 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
209 self.cloud_nodes.add(record)
211 # Node disappeared from the cloud node list. Stop the monitor
212 # actor if necessary and forget about the node.
216 except pykka.ActorDeadError:
219 record.cloud_node = None
221 def _register_arvados_node(self, key, arv_node):
222 self._logger.info("Registering new Arvados node %s", key)
223 record = _ComputeNodeRecord(arvados_node=arv_node)
224 self.arvados_nodes.add(record)
226 def update_arvados_nodes(self, nodelist):
227 self._update_poll_time('arvados_nodes')
228 for key, node in self.arvados_nodes.update_from(nodelist):
229 self._register_arvados_node(key, node)
232 def try_pairing(self):
233 for record in self.cloud_nodes.unpaired():
234 for arv_rec in self.arvados_nodes.unpaired():
235 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
236 self._pair_nodes(record, arv_rec.arvados_node)
239 def _nodes_booting(self, size):
241 for c in self.booting.iterkeys()
242 if size is None or self.sizes_booting[c].id == size.id)
245 def _node_states(self, size):
248 for rec in self.cloud_nodes.nodes.itervalues():
249 if size is None or rec.cloud_node.size.id == size.id:
250 if rec.shutdown_actor is None and rec.actor is not None:
251 proxy_states.append(rec.actor.get_state())
253 states.append("shutdown")
254 return states + pykka.get_all(proxy_states)
256 def _state_counts(self, size):
257 states = self._node_states(size)
259 "booting": self._nodes_booting(size),
267 counts[s] = counts[s] + 1
270 def _nodes_up(self, counts):
271 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
274 def _total_price(self):
276 cost += sum(self.sizes_booting[c].price
277 for c in self.booting.iterkeys())
278 cost += sum(c.cloud_node.size.price
279 for c in self.cloud_nodes.nodes.itervalues())
282 def _size_wishlist(self, size):
283 return sum(1 for c in self.last_wishlist if c.id == size.id)
285 def _nodes_wanted(self, size):
286 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
287 under_min = self.min_nodes - total_node_count
288 over_max = total_node_count - self.max_nodes
289 total_price = self._total_price()
291 counts = self._state_counts(size)
293 up_count = self._nodes_up(counts)
294 busy_count = counts["busy"]
296 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
297 self._size_wishlist(size),
308 elif under_min > 0 and size.id == self.min_cloud_size.id:
311 wanted = self._size_wishlist(size) - (up_count - busy_count)
312 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
313 can_boot = int((self.max_total_price - total_price) / size.price)
315 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
316 size.name, size.price, self.max_total_price, total_price)
321 def _nodes_excess(self, size):
322 counts = self._state_counts(size)
323 up_count = self._nodes_up(counts)
324 if size.id == self.min_cloud_size.id:
325 up_count -= self.min_nodes
326 return up_count - (counts["busy"] + self._size_wishlist(size))
328 def update_server_wishlist(self, wishlist):
329 self._update_poll_time('server_wishlist')
330 self.last_wishlist = wishlist
331 for size in reversed(self.server_calculator.cloud_sizes):
333 nodes_wanted = self._nodes_wanted(size)
335 self._later.start_node(size)
336 elif (nodes_wanted < 0) and self.booting:
337 self._later.stop_booting_node(size)
338 except Exception as e:
339 self._logger.exception("while calculating nodes wanted for size %s", size)
341 def _check_poll_freshness(orig_func):
342 """Decorator to inhibit a method when poll information is stale.
344 This decorator checks the timestamps of all the poll information the
345 daemon has received. The decorated method is only called if none
346 of the timestamps are considered stale.
348 @functools.wraps(orig_func)
349 def wrapper(self, *args, **kwargs):
351 if all(now - t < self.poll_stale_after
352 for t in self.last_polls.itervalues()):
353 return orig_func(self, *args, **kwargs)
358 @_check_poll_freshness
359 def start_node(self, cloud_size):
360 nodes_wanted = self._nodes_wanted(cloud_size)
363 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
364 self._logger.info("Want %i more %s nodes. Booting a node.",
365 nodes_wanted, cloud_size.name)
366 new_setup = self._node_setup.start(
367 timer_actor=self._timer,
368 arvados_client=self._new_arvados(),
369 arvados_node=arvados_node,
370 cloud_client=self._new_cloud(),
371 cloud_size=cloud_size).proxy()
372 self.booting[new_setup.actor_ref.actor_urn] = new_setup
373 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
375 if arvados_node is not None:
376 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
378 new_setup.subscribe(self._later.node_up)
380 self._later.start_node(cloud_size)
382 def _get_actor_attrs(self, actor, *attr_names):
383 return pykka.get_all([getattr(actor, name) for name in attr_names])
385 def node_up(self, setup_proxy):
386 # Called when a SetupActor has completed.
387 cloud_node, arvados_node = self._get_actor_attrs(
388 setup_proxy, 'cloud_node', 'arvados_node')
391 # If cloud_node is None then the node create wasn't
392 # successful and so there isn't anything to do.
393 if cloud_node is not None:
394 # Node creation succeeded. Update cloud node list.
395 cloud_node._nodemanager_recently_booted = True
396 self._register_cloud_node(cloud_node)
397 del self.booting[setup_proxy.actor_ref.actor_urn]
398 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
400 @_check_poll_freshness
401 def stop_booting_node(self, size):
402 nodes_excess = self._nodes_excess(size)
403 if (nodes_excess < 1) or not self.booting:
405 for key, node in self.booting.iteritems():
406 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
407 del self.booting[key]
408 del self.sizes_booting[key]
411 self._later.stop_booting_node(size)
414 def _begin_node_shutdown(self, node_actor, cancellable):
415 cloud_node_obj = node_actor.cloud_node.get()
416 cloud_node_id = cloud_node_obj.id
417 record = self.cloud_nodes[cloud_node_id]
418 if record.shutdown_actor is not None:
420 shutdown = self._node_shutdown.start(
421 timer_actor=self._timer, cloud_client=self._new_cloud(),
422 arvados_client=self._new_arvados(),
423 node_monitor=node_actor.actor_ref, cancellable=cancellable)
424 record.shutdown_actor = shutdown.proxy()
425 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
427 @_check_poll_freshness
428 def node_can_shutdown(self, node_actor):
430 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
431 self._begin_node_shutdown(node_actor, cancellable=True)
432 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
433 # Node is unpaired, which means it probably exceeded its booting
434 # grace period without a ping, so shut it down so we can boot a new
436 self._begin_node_shutdown(node_actor, cancellable=False)
437 elif node_actor.in_state('down').get():
438 # Node is down and unlikely to come back.
439 self._begin_node_shutdown(node_actor, cancellable=False)
440 except pykka.ActorDeadError as e:
441 # The monitor actor sends shutdown suggestions every time the
442 # node's state is updated, and these go into the daemon actor's
443 # message queue. It's possible that the node has already been shut
444 # down (which shuts down the node monitor actor). In that case,
445 # this message is stale and we'll get ActorDeadError when we try to
446 # access node_actor. Log the error.
447 self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
449 def node_finished_shutdown(self, shutdown_actor):
451 cloud_node, success = self._get_actor_attrs(
452 shutdown_actor, 'cloud_node', 'success')
453 except pykka.ActorDeadError:
455 cloud_node_id = cloud_node.id
456 record = self.cloud_nodes[cloud_node_id]
457 shutdown_actor.stop()
458 record.shutdown_actor = None
463 # Shutdown was successful, so stop the monitor actor, otherwise it
464 # will keep offering the node as a candidate for shutdown.
468 # If the node went from being booted to being shut down without ever
469 # appearing in the cloud node list, it will have the
470 # _nodemanager_recently_booted tag, so get rid of it so that the node
471 # can be forgotten completely.
472 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
473 del record.cloud_node._nodemanager_recently_booted
476 self._logger.info("Shutting down after signal.")
477 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
478 setup_stops = {key: node.stop_if_no_cloud_node()
479 for key, node in self.booting.iteritems()}
480 self.booting = {key: self.booting[key]
481 for key in setup_stops if not setup_stops[key].get()}
482 self._later.await_shutdown()
484 def await_shutdown(self):
486 self._timer.schedule(time.time() + 1, self._later.await_shutdown)