3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 return (record for record in self.nodes.itervalues()
72 if getattr(record, self.PAIR_ATTR) is not None)
75 class _CloudNodeTracker(_BaseNodeTracker):
76 RECORD_ATTR = 'cloud_node'
77 PAIR_ATTR = 'arvados_node'
78 item_key = staticmethod(lambda cloud_node: cloud_node.id)
81 class _ArvadosNodeTracker(_BaseNodeTracker):
82 RECORD_ATTR = 'arvados_node'
83 PAIR_ATTR = 'cloud_node'
84 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
86 def find_stale_node(self, stale_time):
87 for record in self.nodes.itervalues():
88 node = record.arvados_node
89 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
91 not cnode.timestamp_fresh(record.assignment_time,
97 class NodeManagerDaemonActor(actor_class):
98 """Node Manager daemon.
100 This actor subscribes to all information polls about cloud nodes,
101 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
102 for every cloud node, subscribing them to poll updates
103 appropriately. It creates and destroys cloud nodes based on job queue
104 demand, and stops the corresponding ComputeNode actors when their work
107 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
108 cloud_nodes_actor, cloud_update_actor, timer_actor,
109 arvados_factory, cloud_factory,
110 shutdown_windows, server_calculator,
111 min_nodes, max_nodes,
112 poll_stale_after=600,
113 boot_fail_after=1800,
114 node_stale_after=7200,
115 node_setup_class=dispatch.ComputeNodeSetupActor,
116 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
117 node_actor_class=dispatch.ComputeNodeMonitorActor,
119 super(NodeManagerDaemonActor, self).__init__()
120 self._node_setup = node_setup_class
121 self._node_shutdown = node_shutdown_class
122 self._node_actor = node_actor_class
123 self._cloud_updater = cloud_update_actor
124 self._timer = timer_actor
125 self._new_arvados = arvados_factory
126 self._new_cloud = cloud_factory
127 self._cloud_driver = self._new_cloud()
128 self._later = self.actor_ref.tell_proxy()
129 self.shutdown_windows = shutdown_windows
130 self.server_calculator = server_calculator
131 self.min_cloud_size = self.server_calculator.cheapest_size()
132 self.min_nodes = min_nodes
133 self.max_nodes = max_nodes
134 self.max_total_price = max_total_price
135 self.poll_stale_after = poll_stale_after
136 self.boot_fail_after = boot_fail_after
137 self.node_stale_after = node_stale_after
139 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
140 poll_actor = locals()[poll_name + '_actor']
141 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
142 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
143 self.last_polls[poll_name] = -self.poll_stale_after
144 self.cloud_nodes = _CloudNodeTracker()
145 self.arvados_nodes = _ArvadosNodeTracker()
146 self.booting = {} # Arvados node UUID to ComputeNodeSetupActors
147 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
148 self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
151 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
152 self._logger.debug("Daemon started")
154 def _update_poll_time(self, poll_key):
155 self.last_polls[poll_key] = time.time()
157 def _pair_nodes(self, node_record, arvados_node):
158 self._logger.info("Cloud node %s is now paired with Arvados node %s",
159 node_record.cloud_node.name, arvados_node['uuid'])
160 self._arvados_nodes_actor.subscribe_to(
161 arvados_node['uuid'], node_record.actor.update_arvados_node)
162 node_record.arvados_node = arvados_node
163 self.arvados_nodes.add(node_record)
165 def _new_node(self, cloud_node):
166 start_time = self._cloud_driver.node_start_time(cloud_node)
167 shutdown_timer = cnode.ShutdownTimer(start_time,
168 self.shutdown_windows)
169 actor = self._node_actor.start(
170 cloud_node=cloud_node,
171 cloud_node_start_time=start_time,
172 shutdown_timer=shutdown_timer,
173 cloud_fqdn_func=self._cloud_driver.node_fqdn,
174 update_actor=self._cloud_updater,
175 timer_actor=self._timer,
177 poll_stale_after=self.poll_stale_after,
178 node_stale_after=self.node_stale_after,
179 cloud_client=self._cloud_driver,
180 boot_fail_after=self.boot_fail_after)
181 actorTell = actor.tell_proxy()
182 actorTell.subscribe(self._later.node_can_shutdown)
183 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
184 actorTell.update_cloud_node)
185 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
188 def _register_cloud_node(self, node):
189 rec = self.cloud_nodes.get(node.id)
191 self._logger.info("Registering new cloud node %s", node.id)
192 record = self._new_node(node)
193 self.cloud_nodes.add(record)
195 rec.cloud_node = node
197 def update_cloud_nodes(self, nodelist):
198 self._update_poll_time('cloud_nodes')
199 for _, node in self.cloud_nodes.update_from(nodelist):
200 self._register_cloud_node(node)
204 for key, record in self.cloud_nodes.orphans.iteritems():
205 if key in self.shutdowns:
207 self.shutdowns[key].stop().get()
208 except pykka.ActorDeadError:
210 del self.shutdowns[key]
211 del self.sizes_booting_shutdown[key]
213 record.cloud_node = None
215 def _register_arvados_node(self, key, arv_node):
216 self._logger.info("Registering new Arvados node %s", key)
217 record = _ComputeNodeRecord(arvados_node=arv_node)
218 self.arvados_nodes.add(record)
220 def update_arvados_nodes(self, nodelist):
221 self._update_poll_time('arvados_nodes')
222 for key, node in self.arvados_nodes.update_from(nodelist):
223 self._register_arvados_node(key, node)
226 def try_pairing(self):
227 for record in self.cloud_nodes.unpaired():
228 for arv_rec in self.arvados_nodes.unpaired():
229 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
230 self._pair_nodes(record, arv_rec.arvados_node)
234 def _nodes_booting(self, size):
236 for c in self.booting.iterkeys()
237 if size is None or self.sizes_booting_shutdown[c].id == size.id)
240 def _nodes_unpaired(self, size):
242 for c in self.cloud_nodes.unpaired()
243 if size is None or c.cloud_node.size.id == size.id)
245 def _nodes_paired(self, size):
247 for c in self.cloud_nodes.paired()
248 if size is None or c.cloud_node.size.id == size.id)
250 def _nodes_down(self, size):
251 # Make sure to iterate over self.cloud_nodes because what we're
252 # counting here are compute nodes that are reported by the cloud
253 # provider but are considered "down" by Arvados.
254 return sum(1 for down in
255 pykka.get_all(rec.actor.in_state('down') for rec in
256 self.cloud_nodes.nodes.itervalues()
257 if ((size is None or rec.cloud_node.size.id == size.id) and
258 rec.cloud_node.id not in self.shutdowns))
261 def _nodes_up(self, size):
262 up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - (self._nodes_down(size) + self._size_shutdowns(size))
265 def _total_price(self):
267 cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
268 for c in self.booting.iterkeys())
269 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
270 for c in self.cloud_nodes.nodes.itervalues())
273 def _nodes_busy(self, size):
274 return sum(1 for busy in
275 pykka.get_all(rec.actor.in_state('busy') for rec in
276 self.cloud_nodes.nodes.itervalues()
277 if rec.cloud_node.size.id == size.id)
280 def _size_wishlist(self, size):
281 return sum(1 for c in self.last_wishlist if c.id == size.id)
283 def _size_shutdowns(self, size):
285 for c in self.shutdowns.iterkeys()
286 if size is None or self.sizes_booting_shutdown[c].id == size.id)
288 def _nodes_wanted(self, size):
289 total_up_count = self._nodes_up(None) + self._nodes_down(None)
290 under_min = self.min_nodes - total_up_count
291 over_max = total_up_count - self.max_nodes
292 total_price = self._total_price()
296 elif under_min > 0 and size.id == self.min_cloud_size.id:
299 up_count = self._nodes_up(size)
300 booting_count = self._nodes_booting(size)
301 unpaired_count = self._nodes_unpaired(size)
302 paired_count = self._nodes_paired(size)
303 busy_count = self._nodes_busy(size)
304 down_count = self._nodes_down(size)
305 idle_count = paired_count - (busy_count+down_count)
306 shutdown_count = self._size_shutdowns(size)
308 self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
309 self._size_wishlist(size),
318 wanted = self._size_wishlist(size) - (up_count - busy_count)
319 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
320 can_boot = int((self.max_total_price - total_price) / size.price)
322 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
323 size.name, size.price, self.max_total_price, total_price)
328 def _nodes_excess(self, size):
329 up_count = self._nodes_up(size)
330 if size.id == self.min_cloud_size.id:
331 up_count -= self.min_nodes
332 return up_count - (self._nodes_busy(size) + self._size_wishlist(size))
334 def update_server_wishlist(self, wishlist):
335 self._update_poll_time('server_wishlist')
336 self.last_wishlist = wishlist
337 for size in reversed(self.server_calculator.cloud_sizes):
339 nodes_wanted = self._nodes_wanted(size)
341 self._later.start_node(size)
342 elif (nodes_wanted < 0) and self.booting:
343 self._later.stop_booting_node(size)
344 except Exception as e:
345 self._logger.exception("while calculating nodes wanted for size %s", size)
347 def _check_poll_freshness(orig_func):
348 """Decorator to inhibit a method when poll information is stale.
350 This decorator checks the timestamps of all the poll information the
351 daemon has received. The decorated method is only called if none
352 of the timestamps are considered stale.
354 @functools.wraps(orig_func)
355 def wrapper(self, *args, **kwargs):
357 if all(now - t < self.poll_stale_after
358 for t in self.last_polls.itervalues()):
359 return orig_func(self, *args, **kwargs)
364 @_check_poll_freshness
365 def start_node(self, cloud_size):
366 nodes_wanted = self._nodes_wanted(cloud_size)
369 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
370 self._logger.info("Want %i more %s nodes. Booting a node.",
371 nodes_wanted, cloud_size.name)
372 new_setup = self._node_setup.start(
373 timer_actor=self._timer,
374 arvados_client=self._new_arvados(),
375 arvados_node=arvados_node,
376 cloud_client=self._new_cloud(),
377 cloud_size=cloud_size).proxy()
378 self.booting[new_setup.actor_ref.actor_urn] = new_setup
379 self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
381 if arvados_node is not None:
382 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
384 new_setup.subscribe(self._later.node_up)
386 self._later.start_node(cloud_size)
388 def _get_actor_attrs(self, actor, *attr_names):
389 return pykka.get_all([getattr(actor, name) for name in attr_names])
391 def node_up(self, setup_proxy):
392 # Called when a SetupActor has completed.
393 cloud_node, arvados_node = self._get_actor_attrs(
394 setup_proxy, 'cloud_node', 'arvados_node')
397 # If cloud_node is None then the node create wasn't
398 # successful and so there isn't anything to do.
399 if cloud_node is not None:
400 # Node creation succeeded. Update cloud node list.
401 self._register_cloud_node(cloud_node)
402 del self.booting[setup_proxy.actor_ref.actor_urn]
403 del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
405 @_check_poll_freshness
406 def stop_booting_node(self, size):
407 nodes_excess = self._nodes_excess(size)
408 if (nodes_excess < 1) or not self.booting:
410 for key, node in self.booting.iteritems():
411 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
412 del self.booting[key]
413 del self.sizes_booting_shutdown[key]
416 self._later.stop_booting_node(size)
419 def _begin_node_shutdown(self, node_actor, cancellable):
420 cloud_node_obj = node_actor.cloud_node.get()
421 cloud_node_id = cloud_node_obj.id
422 if cloud_node_id in self.shutdowns:
424 shutdown = self._node_shutdown.start(
425 timer_actor=self._timer, cloud_client=self._new_cloud(),
426 arvados_client=self._new_arvados(),
427 node_monitor=node_actor.actor_ref, cancellable=cancellable)
428 self.shutdowns[cloud_node_id] = shutdown.proxy()
429 self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
430 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
432 @_check_poll_freshness
433 def node_can_shutdown(self, node_actor):
434 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
436 self._begin_node_shutdown(node_actor, cancellable=True)
437 elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
438 # Node is unpaired, which means it probably exceeded its booting
439 # grace period without a ping, so shut it down so we can boot a new
442 self._begin_node_shutdown(node_actor, cancellable=False)
443 elif node_actor.in_state('down').get():
444 # Node is down and unlikely to come back.
445 self._begin_node_shutdown(node_actor, cancellable=False)
447 def node_finished_shutdown(self, shutdown_actor):
448 cloud_node, success, cancel_reason = self._get_actor_attrs(
449 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
450 cloud_node_id = cloud_node.id
451 shutdown_actor.stop()
453 if cancel_reason == self._node_shutdown.NODE_BROKEN:
454 self.cloud_nodes.blacklist(cloud_node_id)
455 del self.shutdowns[cloud_node_id]
456 del self.sizes_booting_shutdown[cloud_node_id]
457 # On success, we want to leave the entry in self.shutdowns so that it
458 # won't try to shut down the node again. It should disappear from the
459 # cloud node list, and the entry in self.shutdowns will get cleaned up
460 # by update_cloud_nodes.
463 self._logger.info("Shutting down after signal.")
464 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
465 setup_stops = {key: node.stop_if_no_cloud_node()
466 for key, node in self.booting.iteritems()}
467 self.booting = {key: self.booting[key]
468 for key in setup_stops if not setup_stops[key].get()}
469 self._later.await_shutdown()
471 def await_shutdown(self):
473 self._timer.schedule(time.time() + 1, self._later.await_shutdown)