3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
13 from .computenode import dispatch
14 from .config import actor_class
16 class _ComputeNodeRecord(object):
17 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
18 assignment_time=float('-inf')):
20 self.cloud_node = cloud_node
21 self.arvados_node = arvados_node
22 self.assignment_time = assignment_time
23 self.shutdown_actor = None
25 class _BaseNodeTracker(object):
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def update_record(self, key, item):
48 setattr(self.nodes[key], self.RECORD_ATTR, item)
50 def update_from(self, response):
51 unseen = set(self.nodes.iterkeys())
53 key = self.item_key(item)
56 self.update_record(key, item)
59 self.orphans = {key: self.nodes.pop(key) for key in unseen}
62 return (record for record in self.nodes.itervalues()
63 if getattr(record, self.PAIR_ATTR) is None)
66 class _CloudNodeTracker(_BaseNodeTracker):
67 RECORD_ATTR = 'cloud_node'
68 PAIR_ATTR = 'arvados_node'
69 item_key = staticmethod(lambda cloud_node: cloud_node.id)
72 class _ArvadosNodeTracker(_BaseNodeTracker):
73 RECORD_ATTR = 'arvados_node'
74 PAIR_ATTR = 'cloud_node'
75 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
77 def find_stale_node(self, stale_time):
78 for record in self.nodes.itervalues():
79 node = record.arvados_node
80 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
82 not cnode.timestamp_fresh(record.assignment_time,
88 class NodeManagerDaemonActor(actor_class):
89 """Node Manager daemon.
91 This actor subscribes to all information polls about cloud nodes,
92 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
93 for every cloud node, subscribing them to poll updates
94 appropriately. It creates and destroys cloud nodes based on job queue
95 demand, and stops the corresponding ComputeNode actors when their work
98 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
99 cloud_nodes_actor, cloud_update_actor, timer_actor,
100 arvados_factory, cloud_factory,
101 shutdown_windows, server_calculator,
102 min_nodes, max_nodes,
103 poll_stale_after=600,
104 boot_fail_after=1800,
105 node_stale_after=7200,
106 node_setup_class=dispatch.ComputeNodeSetupActor,
107 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
108 node_actor_class=dispatch.ComputeNodeMonitorActor,
110 super(NodeManagerDaemonActor, self).__init__()
111 self._node_setup = node_setup_class
112 self._node_shutdown = node_shutdown_class
113 self._node_actor = node_actor_class
114 self._cloud_updater = cloud_update_actor
115 self._timer = timer_actor
116 self._new_arvados = arvados_factory
117 self._new_cloud = cloud_factory
118 self._cloud_driver = self._new_cloud()
119 self._later = self.actor_ref.tell_proxy()
120 self.shutdown_windows = shutdown_windows
121 self.server_calculator = server_calculator
122 self.min_cloud_size = self.server_calculator.cheapest_size()
123 self.min_nodes = min_nodes
124 self.max_nodes = max_nodes
125 self.max_total_price = max_total_price
126 self.poll_stale_after = poll_stale_after
127 self.boot_fail_after = boot_fail_after
128 self.node_stale_after = node_stale_after
130 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
131 poll_actor = locals()[poll_name + '_actor']
132 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
133 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
134 self.last_polls[poll_name] = -self.poll_stale_after
135 self.cloud_nodes = _CloudNodeTracker()
136 self.arvados_nodes = _ArvadosNodeTracker()
137 self.booting = {} # Actor IDs to ComputeNodeSetupActors
138 self.sizes_booting = {} # Actor IDs to node size
141 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
142 self._logger.debug("Daemon started")
144 def _update_poll_time(self, poll_key):
145 self.last_polls[poll_key] = time.time()
147 def _pair_nodes(self, node_record, arvados_node):
148 self._logger.info("Cloud node %s is now paired with Arvados node %s",
149 node_record.cloud_node.name, arvados_node['uuid'])
150 self._arvados_nodes_actor.subscribe_to(
151 arvados_node['uuid'], node_record.actor.update_arvados_node)
152 node_record.arvados_node = arvados_node
153 self.arvados_nodes.add(node_record)
155 def _new_node(self, cloud_node):
156 start_time = self._cloud_driver.node_start_time(cloud_node)
157 shutdown_timer = cnode.ShutdownTimer(start_time,
158 self.shutdown_windows)
159 actor = self._node_actor.start(
160 cloud_node=cloud_node,
161 cloud_node_start_time=start_time,
162 shutdown_timer=shutdown_timer,
163 cloud_fqdn_func=self._cloud_driver.node_fqdn,
164 update_actor=self._cloud_updater,
165 timer_actor=self._timer,
167 poll_stale_after=self.poll_stale_after,
168 node_stale_after=self.node_stale_after,
169 cloud_client=self._cloud_driver,
170 boot_fail_after=self.boot_fail_after)
171 actorTell = actor.tell_proxy()
172 actorTell.subscribe(self._later.node_can_shutdown)
173 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
174 actorTell.update_cloud_node)
175 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
178 def _register_cloud_node(self, node):
179 rec = self.cloud_nodes.get(node.id)
181 self._logger.info("Registering new cloud node %s", node.id)
182 record = self._new_node(node)
183 self.cloud_nodes.add(record)
185 rec.cloud_node = node
187 def update_cloud_nodes(self, nodelist):
188 self._update_poll_time('cloud_nodes')
189 for _, node in self.cloud_nodes.update_from(nodelist):
190 self._register_cloud_node(node)
194 for record in self.cloud_nodes.orphans.itervalues():
195 if record.shutdown_actor:
197 record.shutdown_actor.stop()
198 except pykka.ActorDeadError:
200 record.shutdown_actor = None
202 # A recently booted node is a node that successfully completed the
203 # setup actor but has not yet appeared in the cloud node list.
204 # This will have the tag _nodemanager_recently_booted on it, which
205 # means (if we're not shutting it down) we want to put it back into
206 # the cloud node list. Once it really appears in the cloud list,
207 # the object in record.cloud_node will be replaced by a new one
208 # that lacks the "_nodemanager_recently_booted" tag.
209 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
210 self.cloud_nodes.add(record)
212 # Node disappeared from the cloud node list. Stop the monitor
213 # actor if necessary and forget about the node.
217 except pykka.ActorDeadError:
220 record.cloud_node = None
222 def _register_arvados_node(self, key, arv_node):
223 self._logger.info("Registering new Arvados node %s", key)
224 record = _ComputeNodeRecord(arvados_node=arv_node)
225 self.arvados_nodes.add(record)
227 def update_arvados_nodes(self, nodelist):
228 self._update_poll_time('arvados_nodes')
229 for key, node in self.arvados_nodes.update_from(nodelist):
230 self._register_arvados_node(key, node)
233 def try_pairing(self):
234 for record in self.cloud_nodes.unpaired():
235 for arv_rec in self.arvados_nodes.unpaired():
236 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
237 self._pair_nodes(record, arv_rec.arvados_node)
240 def _nodes_booting(self, size):
242 for c in self.booting.iterkeys()
243 if size is None or self.sizes_booting[c].id == size.id)
246 def _node_states(self, size):
249 for rec in self.cloud_nodes.nodes.itervalues():
250 if size is None or rec.cloud_node.size.id == size.id:
251 if rec.shutdown_actor is None and rec.actor is not None:
252 proxy_states.append(rec.actor.get_state())
254 states.append("shutdown")
255 return states + pykka.get_all(proxy_states)
257 def _update_tracker(self):
260 for k in status.tracker.keys()
261 if k.startswith('nodes_')
263 for s in self._node_states(size=None):
264 updates.setdefault('nodes_'+s, 0)
265 updates['nodes_'+s] += 1
266 updates['nodes_wish'] = len(self.last_wishlist)
267 status.tracker.update(updates)
269 def _state_counts(self, size):
270 states = self._node_states(size)
272 "booting": self._nodes_booting(size),
280 counts[s] = counts[s] + 1
283 def _nodes_up(self, counts):
284 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
287 def _total_price(self):
289 cost += sum(self.sizes_booting[c].price
290 for c in self.booting.iterkeys())
291 cost += sum(c.cloud_node.size.price
292 for c in self.cloud_nodes.nodes.itervalues())
295 def _size_wishlist(self, size):
296 return sum(1 for c in self.last_wishlist if c.id == size.id)
298 def _nodes_wanted(self, size):
299 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
300 under_min = self.min_nodes - total_node_count
301 over_max = total_node_count - self.max_nodes
302 total_price = self._total_price()
304 counts = self._state_counts(size)
306 up_count = self._nodes_up(counts)
307 busy_count = counts["busy"]
309 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
310 self._size_wishlist(size),
321 elif under_min > 0 and size.id == self.min_cloud_size.id:
324 wanted = self._size_wishlist(size) - (up_count - busy_count)
325 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
326 can_boot = int((self.max_total_price - total_price) / size.price)
328 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
329 size.name, size.price, self.max_total_price, total_price)
334 def _nodes_excess(self, size):
335 counts = self._state_counts(size)
336 up_count = self._nodes_up(counts)
337 if size.id == self.min_cloud_size.id:
338 up_count -= self.min_nodes
339 return up_count - (counts["busy"] + self._size_wishlist(size))
341 def update_server_wishlist(self, wishlist):
342 self._update_poll_time('server_wishlist')
343 self.last_wishlist = wishlist
344 for size in reversed(self.server_calculator.cloud_sizes):
346 nodes_wanted = self._nodes_wanted(size)
348 self._later.start_node(size)
349 elif (nodes_wanted < 0) and self.booting:
350 self._later.stop_booting_node(size)
351 except Exception as e:
352 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
354 self._update_tracker()
356 self._logger.exception("while updating tracker")
358 def _check_poll_freshness(orig_func):
359 """Decorator to inhibit a method when poll information is stale.
361 This decorator checks the timestamps of all the poll information the
362 daemon has received. The decorated method is only called if none
363 of the timestamps are considered stale.
365 @functools.wraps(orig_func)
366 def wrapper(self, *args, **kwargs):
368 if all(now - t < self.poll_stale_after
369 for t in self.last_polls.itervalues()):
370 return orig_func(self, *args, **kwargs)
375 @_check_poll_freshness
376 def start_node(self, cloud_size):
377 nodes_wanted = self._nodes_wanted(cloud_size)
380 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
381 self._logger.info("Want %i more %s nodes. Booting a node.",
382 nodes_wanted, cloud_size.name)
383 new_setup = self._node_setup.start(
384 timer_actor=self._timer,
385 arvados_client=self._new_arvados(),
386 arvados_node=arvados_node,
387 cloud_client=self._new_cloud(),
388 cloud_size=cloud_size).proxy()
389 self.booting[new_setup.actor_ref.actor_urn] = new_setup
390 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
392 if arvados_node is not None:
393 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
395 new_setup.subscribe(self._later.node_up)
397 self._later.start_node(cloud_size)
399 def _get_actor_attrs(self, actor, *attr_names):
400 return pykka.get_all([getattr(actor, name) for name in attr_names])
402 def node_up(self, setup_proxy):
403 # Called when a SetupActor has completed.
404 cloud_node, arvados_node = self._get_actor_attrs(
405 setup_proxy, 'cloud_node', 'arvados_node')
408 # If cloud_node is None then the node create wasn't
409 # successful and so there isn't anything to do.
410 if cloud_node is not None:
411 # Node creation succeeded. Update cloud node list.
412 cloud_node._nodemanager_recently_booted = True
413 self._register_cloud_node(cloud_node)
414 del self.booting[setup_proxy.actor_ref.actor_urn]
415 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
417 @_check_poll_freshness
418 def stop_booting_node(self, size):
419 nodes_excess = self._nodes_excess(size)
420 if (nodes_excess < 1) or not self.booting:
422 for key, node in self.booting.iteritems():
423 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
424 del self.booting[key]
425 del self.sizes_booting[key]
428 self._later.stop_booting_node(size)
431 def _begin_node_shutdown(self, node_actor, cancellable):
432 cloud_node_obj = node_actor.cloud_node.get()
433 cloud_node_id = cloud_node_obj.id
434 record = self.cloud_nodes[cloud_node_id]
435 if record.shutdown_actor is not None:
437 shutdown = self._node_shutdown.start(
438 timer_actor=self._timer, cloud_client=self._new_cloud(),
439 arvados_client=self._new_arvados(),
440 node_monitor=node_actor.actor_ref, cancellable=cancellable)
441 record.shutdown_actor = shutdown.proxy()
442 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
444 @_check_poll_freshness
445 def node_can_shutdown(self, node_actor):
447 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
448 self._begin_node_shutdown(node_actor, cancellable=True)
449 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
450 # Node is unpaired, which means it probably exceeded its booting
451 # grace period without a ping, so shut it down so we can boot a new
453 self._begin_node_shutdown(node_actor, cancellable=False)
454 elif node_actor.in_state('down').get():
455 # Node is down and unlikely to come back.
456 self._begin_node_shutdown(node_actor, cancellable=False)
457 except pykka.ActorDeadError as e:
458 # The monitor actor sends shutdown suggestions every time the
459 # node's state is updated, and these go into the daemon actor's
460 # message queue. It's possible that the node has already been shut
461 # down (which shuts down the node monitor actor). In that case,
462 # this message is stale and we'll get ActorDeadError when we try to
463 # access node_actor. Log the error.
464 self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
466 def node_finished_shutdown(self, shutdown_actor):
468 cloud_node, success = self._get_actor_attrs(
469 shutdown_actor, 'cloud_node', 'success')
470 except pykka.ActorDeadError:
472 cloud_node_id = cloud_node.id
473 record = self.cloud_nodes[cloud_node_id]
474 shutdown_actor.stop()
475 record.shutdown_actor = None
480 # Shutdown was successful, so stop the monitor actor, otherwise it
481 # will keep offering the node as a candidate for shutdown.
485 # If the node went from being booted to being shut down without ever
486 # appearing in the cloud node list, it will have the
487 # _nodemanager_recently_booted tag, so get rid of it so that the node
488 # can be forgotten completely.
489 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
490 del record.cloud_node._nodemanager_recently_booted
493 self._logger.info("Shutting down after signal.")
494 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
495 setup_stops = {key: node.stop_if_no_cloud_node()
496 for key, node in self.booting.iteritems()}
497 self.booting = {key: self.booting[key]
498 for key in setup_stops if not setup_stops[key].get()}
499 self._later.await_shutdown()
501 def await_shutdown(self):
503 self._timer.schedule(time.time() + 1, self._later.await_shutdown)