3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
22 self.shutdown_actor = None
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._later = self.actor_ref.tell_proxy()
125 self.shutdown_windows = shutdown_windows
126 self.server_calculator = server_calculator
127 self.min_cloud_size = self.server_calculator.cheapest_size()
128 self.min_nodes = min_nodes
129 self.max_nodes = max_nodes
130 self.max_total_price = max_total_price
131 self.poll_stale_after = poll_stale_after
132 self.boot_fail_after = boot_fail_after
133 self.node_stale_after = node_stale_after
135 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136 poll_actor = locals()[poll_name + '_actor']
137 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139 self.last_polls[poll_name] = -self.poll_stale_after
140 self.cloud_nodes = _CloudNodeTracker()
141 self.arvados_nodes = _ArvadosNodeTracker()
142 self.booting = {} # Actor IDs to ComputeNodeSetupActors
143 self.sizes_booting = {} # Actor IDs to node size
146 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
147 self._logger.debug("Daemon started")
149 def _update_poll_time(self, poll_key):
150 self.last_polls[poll_key] = time.time()
152 def _pair_nodes(self, node_record, arvados_node):
153 self._logger.info("Cloud node %s is now paired with Arvados node %s",
154 node_record.cloud_node.name, arvados_node['uuid'])
155 self._arvados_nodes_actor.subscribe_to(
156 arvados_node['uuid'], node_record.actor.update_arvados_node)
157 node_record.arvados_node = arvados_node
158 self.arvados_nodes.add(node_record)
160 def _new_node(self, cloud_node):
161 start_time = self._cloud_driver.node_start_time(cloud_node)
162 shutdown_timer = cnode.ShutdownTimer(start_time,
163 self.shutdown_windows)
164 actor = self._node_actor.start(
165 cloud_node=cloud_node,
166 cloud_node_start_time=start_time,
167 shutdown_timer=shutdown_timer,
168 cloud_fqdn_func=self._cloud_driver.node_fqdn,
169 update_actor=self._cloud_updater,
170 timer_actor=self._timer,
172 poll_stale_after=self.poll_stale_after,
173 node_stale_after=self.node_stale_after,
174 cloud_client=self._cloud_driver,
175 boot_fail_after=self.boot_fail_after)
176 actorTell = actor.tell_proxy()
177 actorTell.subscribe(self._later.node_can_shutdown)
178 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
179 actorTell.update_cloud_node)
180 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
183 def _register_cloud_node(self, node):
184 rec = self.cloud_nodes.get(node.id)
186 self._logger.info("Registering new cloud node %s", node.id)
187 record = self._new_node(node)
188 self.cloud_nodes.add(record)
190 rec.cloud_node = node
192 def update_cloud_nodes(self, nodelist):
193 self._update_poll_time('cloud_nodes')
194 for _, node in self.cloud_nodes.update_from(nodelist):
195 self._register_cloud_node(node)
199 for record in self.cloud_nodes.orphans.itervalues():
200 if record.shutdown_actor:
202 record.shutdown_actor.stop()
203 except pykka.ActorDeadError:
205 record.shutdown_actor = None
207 # A recently booted node is a node that successfully completed the
208 # setup actor but has not yet appeared in the cloud node list.
209 # This will have the tag _nodemanager_recently_booted on it, which
210 # means (if we're not shutting it down) we want to put it back into
211 # the cloud node list. Once it really appears in the cloud list,
212 # the object in record.cloud_node will be replaced by a new one
213 # that lacks the "_nodemanager_recently_booted" tag.
214 if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
215 self.cloud_nodes.add(record)
218 record.cloud_node = None
220 def _register_arvados_node(self, key, arv_node):
221 self._logger.info("Registering new Arvados node %s", key)
222 record = _ComputeNodeRecord(arvados_node=arv_node)
223 self.arvados_nodes.add(record)
225 def update_arvados_nodes(self, nodelist):
226 self._update_poll_time('arvados_nodes')
227 for key, node in self.arvados_nodes.update_from(nodelist):
228 self._register_arvados_node(key, node)
231 def try_pairing(self):
232 for record in self.cloud_nodes.unpaired():
233 for arv_rec in self.arvados_nodes.unpaired():
234 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
235 self._pair_nodes(record, arv_rec.arvados_node)
238 def _nodes_booting(self, size):
240 for c in self.booting.iterkeys()
241 if size is None or self.sizes_booting[c].id == size.id)
244 def _node_states(self, size):
245 states = pykka.get_all(rec.actor.get_state()
246 for rec in self.cloud_nodes.nodes.itervalues()
247 if ((size is None or rec.cloud_node.size.id == size.id) and
248 rec.shutdown_actor is None))
249 states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues()
250 if ((size is None or rec.cloud_node.size.id == size.id) and
251 rec.shutdown_actor is not None)]
254 def _state_counts(self, size):
255 states = self._node_states(size)
257 "booting": self._nodes_booting(size),
265 counts[s] = counts[s] + 1
268 def _nodes_up(self, counts):
269 up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
272 def _total_price(self):
274 cost += sum(self.server_calculator.find_size(self.sizes_booting[c].id).price
275 for c in self.booting.iterkeys())
276 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
277 for c in self.cloud_nodes.nodes.itervalues())
280 def _size_wishlist(self, size):
281 return sum(1 for c in self.last_wishlist if c.id == size.id)
283 def _nodes_wanted(self, size):
284 total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
285 under_min = self.min_nodes - total_node_count
286 over_max = total_node_count - self.max_nodes
287 total_price = self._total_price()
289 counts = self._state_counts(size)
291 up_count = self._nodes_up(counts)
292 busy_count = counts["busy"]
294 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
295 self._size_wishlist(size),
306 elif under_min > 0 and size.id == self.min_cloud_size.id:
309 wanted = self._size_wishlist(size) - (up_count - busy_count)
310 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
311 can_boot = int((self.max_total_price - total_price) / size.price)
313 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
314 size.name, size.price, self.max_total_price, total_price)
319 def _nodes_excess(self, size):
320 counts = self._state_counts(size)
321 up_count = self._nodes_up(counts)
322 if size.id == self.min_cloud_size.id:
323 up_count -= self.min_nodes
324 return up_count - (counts["busy"] + self._size_wishlist(size))
326 def update_server_wishlist(self, wishlist):
327 self._update_poll_time('server_wishlist')
328 self.last_wishlist = wishlist
329 for size in reversed(self.server_calculator.cloud_sizes):
331 nodes_wanted = self._nodes_wanted(size)
333 self._later.start_node(size)
334 elif (nodes_wanted < 0) and self.booting:
335 self._later.stop_booting_node(size)
336 except Exception as e:
337 self._logger.exception("while calculating nodes wanted for size %s", size)
339 def _check_poll_freshness(orig_func):
340 """Decorator to inhibit a method when poll information is stale.
342 This decorator checks the timestamps of all the poll information the
343 daemon has received. The decorated method is only called if none
344 of the timestamps are considered stale.
346 @functools.wraps(orig_func)
347 def wrapper(self, *args, **kwargs):
349 if all(now - t < self.poll_stale_after
350 for t in self.last_polls.itervalues()):
351 return orig_func(self, *args, **kwargs)
356 @_check_poll_freshness
357 def start_node(self, cloud_size):
358 nodes_wanted = self._nodes_wanted(cloud_size)
361 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
362 self._logger.info("Want %i more %s nodes. Booting a node.",
363 nodes_wanted, cloud_size.name)
364 new_setup = self._node_setup.start(
365 timer_actor=self._timer,
366 arvados_client=self._new_arvados(),
367 arvados_node=arvados_node,
368 cloud_client=self._new_cloud(),
369 cloud_size=cloud_size).proxy()
370 self.booting[new_setup.actor_ref.actor_urn] = new_setup
371 self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
373 if arvados_node is not None:
374 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
376 new_setup.subscribe(self._later.node_up)
378 self._later.start_node(cloud_size)
380 def _get_actor_attrs(self, actor, *attr_names):
381 return pykka.get_all([getattr(actor, name) for name in attr_names])
383 def node_up(self, setup_proxy):
384 # Called when a SetupActor has completed.
385 cloud_node, arvados_node = self._get_actor_attrs(
386 setup_proxy, 'cloud_node', 'arvados_node')
389 # If cloud_node is None then the node create wasn't
390 # successful and so there isn't anything to do.
391 if cloud_node is not None:
392 # Node creation succeeded. Update cloud node list.
393 cloud_node._nodemanager_recently_booted = True
394 self._register_cloud_node(cloud_node)
395 del self.booting[setup_proxy.actor_ref.actor_urn]
396 del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
398 @_check_poll_freshness
399 def stop_booting_node(self, size):
400 nodes_excess = self._nodes_excess(size)
401 if (nodes_excess < 1) or not self.booting:
403 for key, node in self.booting.iteritems():
404 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
405 del self.booting[key]
406 del self.sizes_booting[key]
409 self._later.stop_booting_node(size)
412 def _begin_node_shutdown(self, node_actor, cancellable):
413 cloud_node_obj = node_actor.cloud_node.get()
414 cloud_node_id = cloud_node_obj.id
415 record = self.cloud_nodes[cloud_node_id]
416 if record.shutdown_actor is not None:
418 shutdown = self._node_shutdown.start(
419 timer_actor=self._timer, cloud_client=self._new_cloud(),
420 arvados_client=self._new_arvados(),
421 node_monitor=node_actor.actor_ref, cancellable=cancellable)
422 record.shutdown_actor = shutdown.proxy()
423 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
425 @_check_poll_freshness
426 def node_can_shutdown(self, node_actor):
427 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
428 self._begin_node_shutdown(node_actor, cancellable=True)
429 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
430 # Node is unpaired, which means it probably exceeded its booting
431 # grace period without a ping, so shut it down so we can boot a new
433 self._begin_node_shutdown(node_actor, cancellable=False)
434 elif node_actor.in_state('down').get():
435 # Node is down and unlikely to come back.
436 self._begin_node_shutdown(node_actor, cancellable=False)
438 def node_finished_shutdown(self, shutdown_actor):
440 cloud_node, success, cancel_reason = self._get_actor_attrs(
441 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
442 except pykka.ActorDeadError:
444 cloud_node_id = cloud_node.id
445 record = self.cloud_nodes[cloud_node_id]
446 shutdown_actor.stop()
448 if cancel_reason == self._node_shutdown.NODE_BROKEN:
449 self.cloud_nodes.blacklist(cloud_node_id)
450 record.shutdown_actor = None
452 # If the node went from being booted to being shut down without ever
453 # appearing in the cloud node list, it will have the
454 # _nodemanager_recently_booted tag, so get rid of it so that the node
455 # can be forgotten completely.
456 if hasattr(self.cloud_nodes[cloud_node_id].cloud_node, "_nodemanager_recently_booted"):
457 del self.cloud_nodes[cloud_node_id].cloud_node._nodemanager_recently_booted
460 self._logger.info("Shutting down after signal.")
461 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
462 setup_stops = {key: node.stop_if_no_cloud_node()
463 for key, node in self.booting.iteritems()}
464 self.booting = {key: self.booting[key]
465 for key in setup_stops if not setup_stops[key].get()}
466 self._later.await_shutdown()
468 def await_shutdown(self):
470 self._timer.schedule(time.time() + 1, self._later.await_shutdown)