3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._later = self.actor_ref.tell_proxy()
125 self.shutdown_windows = shutdown_windows
126 self.server_calculator = server_calculator
127 self.min_cloud_size = self.server_calculator.cheapest_size()
128 self.min_nodes = min_nodes
129 self.max_nodes = max_nodes
130 self.max_total_price = max_total_price
131 self.poll_stale_after = poll_stale_after
132 self.boot_fail_after = boot_fail_after
133 self.node_stale_after = node_stale_after
135 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136 poll_actor = locals()[poll_name + '_actor']
137 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139 self.last_polls[poll_name] = -self.poll_stale_after
140 self.cloud_nodes = _CloudNodeTracker()
141 self.arvados_nodes = _ArvadosNodeTracker()
142 self.booting = {} # Actor IDs to ComputeNodeSetupActors
143 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
144 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
145 self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
148 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149 self._logger.debug("Daemon started")
151 def _update_poll_time(self, poll_key):
152 self.last_polls[poll_key] = time.time()
154 def _pair_nodes(self, node_record, arvados_node):
155 self._logger.info("Cloud node %s is now paired with Arvados node %s",
156 node_record.cloud_node.name, arvados_node['uuid'])
157 self._arvados_nodes_actor.subscribe_to(
158 arvados_node['uuid'], node_record.actor.update_arvados_node)
159 node_record.arvados_node = arvados_node
160 self.arvados_nodes.add(node_record)
162 def _new_node(self, cloud_node):
163 start_time = self._cloud_driver.node_start_time(cloud_node)
164 shutdown_timer = cnode.ShutdownTimer(start_time,
165 self.shutdown_windows)
166 actor = self._node_actor.start(
167 cloud_node=cloud_node,
168 cloud_node_start_time=start_time,
169 shutdown_timer=shutdown_timer,
170 cloud_fqdn_func=self._cloud_driver.node_fqdn,
171 update_actor=self._cloud_updater,
172 timer_actor=self._timer,
174 poll_stale_after=self.poll_stale_after,
175 node_stale_after=self.node_stale_after,
176 cloud_client=self._cloud_driver,
177 boot_fail_after=self.boot_fail_after)
178 actorTell = actor.tell_proxy()
179 actorTell.subscribe(self._later.node_can_shutdown)
180 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
181 actorTell.update_cloud_node)
182 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
185 def update_cloud_nodes(self, nodelist):
186 self._update_poll_time('cloud_nodes')
187 for key, node in self.cloud_nodes.update_from(nodelist):
188 self._logger.info("Registering new cloud node %s", key)
189 if key in self.booted:
190 record = self.booted.pop(key)
192 record = self._new_node(node)
193 self.cloud_nodes.add(record)
194 for arv_rec in self.arvados_nodes.unpaired():
195 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
196 self._pair_nodes(record, arv_rec.arvados_node)
198 for key, record in self.cloud_nodes.orphans.iteritems():
199 if key in self.shutdowns:
201 self.shutdowns[key].stop().get()
202 except pykka.ActorDeadError:
204 del self.shutdowns[key]
205 del self.sizes_booting_shutdown[key]
207 record.cloud_node = None
209 def update_arvados_nodes(self, nodelist):
210 self._update_poll_time('arvados_nodes')
211 for key, node in self.arvados_nodes.update_from(nodelist):
212 self._logger.info("Registering new Arvados node %s", key)
213 record = _ComputeNodeRecord(arvados_node=node)
214 self.arvados_nodes.add(record)
215 for arv_rec in self.arvados_nodes.unpaired():
216 arv_node = arv_rec.arvados_node
217 for cloud_rec in self.cloud_nodes.unpaired():
218 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
219 self._pair_nodes(cloud_rec, arv_node)
222 def _nodes_booting(self, size):
224 for c in self.booting.iterkeys()
225 if size is None or self.sizes_booting_shutdown[c].id == size.id)
227 for c in self.booted.itervalues()
228 if size is None or c.cloud_node.size.id == size.id)
231 def _nodes_unpaired(self, size):
233 for c in self.cloud_nodes.unpaired()
234 if size is None or c.cloud_node.size.id == size.id)
236 def _nodes_booted(self, size):
238 for c in self.cloud_nodes.nodes.itervalues()
239 if size is None or c.cloud_node.size.id == size.id)
241 def _nodes_down(self, size):
242 # Make sure to iterate over self.cloud_nodes because what we're
243 # counting here are compute nodes that are reported by the cloud
244 # provider but are considered "down" by Arvados.
245 return sum(1 for down in
246 pykka.get_all(rec.actor.in_state('down') for rec in
247 self.cloud_nodes.nodes.itervalues()
248 if size is None or rec.cloud_node.size.id == size.id)
251 def _nodes_up(self, size):
252 up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
255 def _total_price(self):
257 cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
258 for c in self.booting.iterkeys())
259 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
260 for i in (self.booted, self.cloud_nodes.nodes)
261 for c in i.itervalues())
264 def _nodes_busy(self, size):
265 return sum(1 for busy in
266 pykka.get_all(rec.actor.in_state('busy') for rec in
267 self.cloud_nodes.nodes.itervalues()
268 if rec.cloud_node.size.id == size.id)
271 def _nodes_missing(self, size):
272 return sum(1 for arv_node in
273 pykka.get_all(rec.actor.arvados_node for rec in
274 self.cloud_nodes.nodes.itervalues()
275 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
276 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
278 def _size_wishlist(self, size):
279 return sum(1 for c in self.last_wishlist if c.id == size.id)
281 def _size_shutdowns(self, size):
283 for c in self.shutdowns.iterkeys():
285 if self.sizes_booting_shutdown[c].id == size.id:
287 except pykka.ActorDeadError:
291 def _nodes_wanted(self, size):
292 total_up_count = self._nodes_up(None)
293 under_min = self.min_nodes - total_up_count
294 over_max = total_up_count - self.max_nodes
295 total_price = self._total_price()
299 elif under_min > 0 and size.id == self.min_cloud_size.id:
302 booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
303 shutdown_count = self._size_shutdowns(size)
304 busy_count = self._nodes_busy(size)
305 up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
307 self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
308 self._size_wishlist(size),
309 up_count + busy_count,
311 up_count - booting_count,
315 wanted = self._size_wishlist(size) - up_count
316 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
317 can_boot = int((self.max_total_price - total_price) / size.price)
319 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
320 size.name, size.price, self.max_total_price, total_price)
325 def _nodes_excess(self, size):
326 up_count = self._nodes_up(size) - self._size_shutdowns(size)
327 if size.id == self.min_cloud_size.id:
328 up_count -= self.min_nodes
329 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
331 def update_server_wishlist(self, wishlist):
332 self._update_poll_time('server_wishlist')
333 self.last_wishlist = wishlist
334 for size in reversed(self.server_calculator.cloud_sizes):
336 nodes_wanted = self._nodes_wanted(size)
338 self._later.start_node(size)
339 elif (nodes_wanted < 0) and self.booting:
340 self._later.stop_booting_node(size)
341 except Exception as e:
342 self._logger.exception("while calculating nodes wanted for size %s", size)
344 def _check_poll_freshness(orig_func):
345 """Decorator to inhibit a method when poll information is stale.
347 This decorator checks the timestamps of all the poll information the
348 daemon has received. The decorated method is only called if none
349 of the timestamps are considered stale.
351 @functools.wraps(orig_func)
352 def wrapper(self, *args, **kwargs):
354 if all(now - t < self.poll_stale_after
355 for t in self.last_polls.itervalues()):
356 return orig_func(self, *args, **kwargs)
361 @_check_poll_freshness
362 def start_node(self, cloud_size):
363 nodes_wanted = self._nodes_wanted(cloud_size)
366 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
367 self._logger.info("Want %i more %s nodes. Booting a node.",
368 nodes_wanted, cloud_size.name)
369 new_setup = self._node_setup.start(
370 timer_actor=self._timer,
371 arvados_client=self._new_arvados(),
372 arvados_node=arvados_node,
373 cloud_client=self._new_cloud(),
374 cloud_size=cloud_size).proxy()
375 self.booting[new_setup.actor_ref.actor_urn] = new_setup
376 self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
378 if arvados_node is not None:
379 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
381 new_setup.subscribe(self._later.node_up)
383 self._later.start_node(cloud_size)
385 def _get_actor_attrs(self, actor, *attr_names):
386 return pykka.get_all([getattr(actor, name) for name in attr_names])
388 def node_up(self, setup_proxy):
389 cloud_node = setup_proxy.cloud_node.get()
390 del self.booting[setup_proxy.actor_ref.actor_urn]
391 del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
394 if cloud_node is not None:
395 record = self.cloud_nodes.get(cloud_node.id)
397 record = self._new_node(cloud_node)
398 self.booted[cloud_node.id] = record
399 self._timer.schedule(time.time() + self.boot_fail_after,
400 self._later.shutdown_unpaired_node, cloud_node.id)
402 @_check_poll_freshness
403 def stop_booting_node(self, size):
404 nodes_excess = self._nodes_excess(size)
405 if (nodes_excess < 1) or not self.booting:
407 for key, node in self.booting.iteritems():
408 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
409 del self.booting[key]
410 del self.sizes_booting_shutdown[key]
413 self._later.stop_booting_node(size)
416 def _begin_node_shutdown(self, node_actor, cancellable):
417 cloud_node_obj = node_actor.cloud_node.get()
418 cloud_node_id = cloud_node_obj.id
419 if cloud_node_id in self.shutdowns:
421 shutdown = self._node_shutdown.start(
422 timer_actor=self._timer, cloud_client=self._new_cloud(),
423 arvados_client=self._new_arvados(),
424 node_monitor=node_actor.actor_ref, cancellable=cancellable)
425 self.shutdowns[cloud_node_id] = shutdown.proxy()
426 self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
427 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
429 @_check_poll_freshness
430 def node_can_shutdown(self, node_actor):
431 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
432 self._begin_node_shutdown(node_actor, cancellable=True)
434 def shutdown_unpaired_node(self, cloud_node_id):
435 for record_dict in [self.cloud_nodes, self.booted]:
436 if cloud_node_id in record_dict:
437 record = record_dict[cloud_node_id]
441 if not record.actor.in_state('idle', 'busy').get():
442 self._begin_node_shutdown(record.actor, cancellable=False)
444 def node_finished_shutdown(self, shutdown_actor):
445 cloud_node, success, cancel_reason = self._get_actor_attrs(
446 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
447 shutdown_actor.stop()
448 cloud_node_id = cloud_node.id
450 if cancel_reason == self._node_shutdown.NODE_BROKEN:
451 self.cloud_nodes.blacklist(cloud_node_id)
452 elif cloud_node_id in self.booted:
453 self.booted.pop(cloud_node_id).actor.stop()
454 del self.shutdowns[cloud_node_id]
455 del self.sizes_booting_shutdown[cloud_node_id]
458 self._logger.info("Shutting down after signal.")
459 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
460 setup_stops = {key: node.stop_if_no_cloud_node()
461 for key, node in self.booting.iteritems()}
462 self.booting = {key: self.booting[key]
463 for key in setup_stops if not setup_stops[key].get()}
464 self._later.await_shutdown()
466 def await_shutdown(self):
468 self._timer.schedule(time.time() + 1, self._later.await_shutdown)