3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._later = self.actor_ref.tell_proxy()
125 self.shutdown_windows = shutdown_windows
126 self.server_calculator = server_calculator
127 self.min_cloud_size = self.server_calculator.cheapest_size()
128 self.min_nodes = min_nodes
129 self.max_nodes = max_nodes
130 self.max_total_price = max_total_price
131 self.poll_stale_after = poll_stale_after
132 self.boot_fail_after = boot_fail_after
133 self.node_stale_after = node_stale_after
135 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136 poll_actor = locals()[poll_name + '_actor']
137 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139 self.last_polls[poll_name] = -self.poll_stale_after
140 self.cloud_nodes = _CloudNodeTracker()
141 self.arvados_nodes = _ArvadosNodeTracker()
142 self.booting = {} # Actor IDs to ComputeNodeSetupActors
143 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
144 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
145 self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
148 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149 self._logger.debug("Daemon started")
151 def _update_poll_time(self, poll_key):
152 self.last_polls[poll_key] = time.time()
154 def _pair_nodes(self, node_record, arvados_node):
155 self._logger.info("Cloud node %s is now paired with Arvados node %s",
156 node_record.cloud_node.name, arvados_node['uuid'])
157 self._arvados_nodes_actor.subscribe_to(
158 arvados_node['uuid'], node_record.actor.update_arvados_node)
159 node_record.arvados_node = arvados_node
160 self.arvados_nodes.add(node_record)
162 def _new_node(self, cloud_node):
163 start_time = self._cloud_driver.node_start_time(cloud_node)
164 shutdown_timer = cnode.ShutdownTimer(start_time,
165 self.shutdown_windows)
166 actor = self._node_actor.start(
167 cloud_node=cloud_node,
168 cloud_node_start_time=start_time,
169 shutdown_timer=shutdown_timer,
170 cloud_fqdn_func=self._cloud_driver.node_fqdn,
171 update_actor=self._cloud_updater,
172 timer_actor=self._timer,
174 poll_stale_after=self.poll_stale_after,
175 node_stale_after=self.node_stale_after,
176 cloud_client=self._cloud_driver,
177 boot_fail_after=self.boot_fail_after)
178 actorTell = actor.tell_proxy()
179 actorTell.subscribe(self._later.node_can_shutdown)
180 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
181 actorTell.update_cloud_node)
182 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
185 def update_cloud_nodes(self, nodelist):
186 self._update_poll_time('cloud_nodes')
187 for key, node in self.cloud_nodes.update_from(nodelist):
188 self._logger.info("Registering new cloud node %s", key)
189 if key in self.booted:
190 record = self.booted.pop(key)
192 record = self._new_node(node)
193 self.cloud_nodes.add(record)
194 for arv_rec in self.arvados_nodes.unpaired():
195 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
196 self._pair_nodes(record, arv_rec.arvados_node)
198 for key, record in self.cloud_nodes.orphans.iteritems():
199 if key in self.shutdowns:
201 self.shutdowns[key].stop().get()
202 except pykka.ActorDeadError:
204 del self.shutdowns[key]
205 del self.sizes_booting_shutdown[key]
207 record.cloud_node = None
209 def update_arvados_nodes(self, nodelist):
210 self._update_poll_time('arvados_nodes')
211 for key, node in self.arvados_nodes.update_from(nodelist):
212 self._logger.info("Registering new Arvados node %s", key)
213 record = _ComputeNodeRecord(arvados_node=node)
214 self.arvados_nodes.add(record)
215 for arv_rec in self.arvados_nodes.unpaired():
216 arv_node = arv_rec.arvados_node
217 for cloud_rec in self.cloud_nodes.unpaired():
218 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
219 self._pair_nodes(cloud_rec, arv_node)
222 def _nodes_booting(self, size):
224 for c in self.booting.iterkeys()
225 if size is None or self.sizes_booting_shutdown[c].id == size.id)
227 for c in self.booted.itervalues()
228 if size is None or c.cloud_node.size.id == size.id)
231 def _nodes_unpaired(self, size):
233 for c in self.cloud_nodes.unpaired()
234 if size is None or c.cloud_node.size.id == size.id)
236 def _nodes_booted(self, size):
238 for c in self.cloud_nodes.nodes.itervalues()
239 if size is None or c.cloud_node.size.id == size.id)
241 def _nodes_up(self, size):
242 up = self._nodes_booting(size) + self._nodes_booted(size)
245 def _total_price(self):
247 cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
248 for c in self.booting.iterkeys())
249 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
250 for i in (self.booted, self.cloud_nodes.nodes)
251 for c in i.itervalues())
254 def _nodes_busy(self, size):
255 return sum(1 for busy in
256 pykka.get_all(rec.actor.in_state('busy') for rec in
257 self.cloud_nodes.nodes.itervalues()
258 if rec.cloud_node.size.id == size.id)
261 def _nodes_missing(self, size):
262 return sum(1 for arv_node in
263 pykka.get_all(rec.actor.arvados_node for rec in
264 self.cloud_nodes.nodes.itervalues()
265 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
266 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
268 def _size_wishlist(self, size):
269 return sum(1 for c in self.last_wishlist if c.id == size.id)
271 def _size_shutdowns(self, size):
273 for c in self.shutdowns.iterkeys():
275 if self.sizes_booting_shutdown[c].id == size.id:
277 except pykka.ActorDeadError:
281 def _nodes_wanted(self, size):
282 total_up_count = self._nodes_up(None)
283 under_min = self.min_nodes - total_up_count
284 over_max = total_up_count - self.max_nodes
285 total_price = self._total_price()
289 elif under_min > 0 and size.id == self.min_cloud_size.id:
292 booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
293 shutdown_count = self._size_shutdowns(size)
294 busy_count = self._nodes_busy(size)
295 up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
297 self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
298 self._size_wishlist(size),
299 up_count + busy_count,
301 up_count - booting_count,
305 wanted = self._size_wishlist(size) - up_count
306 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
307 can_boot = int((self.max_total_price - total_price) / size.price)
309 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
310 size.name, size.price, self.max_total_price, total_price)
315 def _nodes_excess(self, size):
316 up_count = self._nodes_up(size) - self._size_shutdowns(size)
317 if size.id == self.min_cloud_size.id:
318 up_count -= self.min_nodes
319 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
321 def update_server_wishlist(self, wishlist):
322 self._update_poll_time('server_wishlist')
323 self.last_wishlist = wishlist
324 for size in reversed(self.server_calculator.cloud_sizes):
326 nodes_wanted = self._nodes_wanted(size)
328 self._later.start_node(size)
329 elif (nodes_wanted < 0) and self.booting:
330 self._later.stop_booting_node(size)
331 except Exception as e:
332 self._logger.exception("while calculating nodes wanted for size %s", size)
334 def _check_poll_freshness(orig_func):
335 """Decorator to inhibit a method when poll information is stale.
337 This decorator checks the timestamps of all the poll information the
338 daemon has received. The decorated method is only called if none
339 of the timestamps are considered stale.
341 @functools.wraps(orig_func)
342 def wrapper(self, *args, **kwargs):
344 if all(now - t < self.poll_stale_after
345 for t in self.last_polls.itervalues()):
346 return orig_func(self, *args, **kwargs)
351 @_check_poll_freshness
352 def start_node(self, cloud_size):
353 nodes_wanted = self._nodes_wanted(cloud_size)
356 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
357 self._logger.info("Want %i more %s nodes. Booting a node.",
358 nodes_wanted, cloud_size.name)
359 new_setup = self._node_setup.start(
360 timer_actor=self._timer,
361 arvados_client=self._new_arvados(),
362 arvados_node=arvados_node,
363 cloud_client=self._new_cloud(),
364 cloud_size=cloud_size).tell_proxy()
365 self.booting[new_setup.actor_ref.actor_urn] = new_setup
366 self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
368 if arvados_node is not None:
369 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
371 new_setup.subscribe(self._later.node_up)
373 self._later.start_node(cloud_size)
375 def _get_actor_attrs(self, actor, *attr_names):
376 return pykka.get_all([getattr(actor, name) for name in attr_names])
378 def node_up(self, setup_proxy):
379 cloud_node = setup_proxy.cloud_node.get()
380 del self.booting[setup_proxy.actor_ref.actor_urn]
381 del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
384 if cloud_node is not None:
385 record = self.cloud_nodes.get(cloud_node.id)
387 record = self._new_node(cloud_node)
388 self.booted[cloud_node.id] = record
389 self._timer.schedule(time.time() + self.boot_fail_after,
390 self._later.shutdown_unpaired_node, cloud_node.id)
392 @_check_poll_freshness
393 def stop_booting_node(self, size):
394 nodes_excess = self._nodes_excess(size)
395 if (nodes_excess < 1) or not self.booting:
397 for key, node in self.booting.iteritems():
398 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
399 del self.booting[key]
400 del self.sizes_booting_shutdown[key]
403 self._later.stop_booting_node(size)
406 def _begin_node_shutdown(self, node_actor, cancellable):
407 cloud_node_obj = node_actor.cloud_node.get()
408 cloud_node_id = cloud_node_obj.id
409 if cloud_node_id in self.shutdowns:
411 shutdown = self._node_shutdown.start(
412 timer_actor=self._timer, cloud_client=self._new_cloud(),
413 arvados_client=self._new_arvados(),
414 node_monitor=node_actor.actor_ref, cancellable=cancellable)
415 self.shutdowns[cloud_node_id] = shutdown.proxy()
416 self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
417 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
419 @_check_poll_freshness
420 def node_can_shutdown(self, node_actor):
421 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
422 self._begin_node_shutdown(node_actor, cancellable=True)
424 def shutdown_unpaired_node(self, cloud_node_id):
425 for record_dict in [self.cloud_nodes, self.booted]:
426 if cloud_node_id in record_dict:
427 record = record_dict[cloud_node_id]
431 if not record.actor.in_state('idle', 'busy').get():
432 self._begin_node_shutdown(record.actor, cancellable=False)
434 def node_finished_shutdown(self, shutdown_actor):
435 cloud_node, success, cancel_reason = self._get_actor_attrs(
436 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
437 shutdown_actor.stop()
438 cloud_node_id = cloud_node.id
440 if cancel_reason == self._node_shutdown.NODE_BROKEN:
441 self.cloud_nodes.blacklist(cloud_node_id)
442 elif cloud_node_id in self.booted:
443 self.booted.pop(cloud_node_id).actor.stop()
444 del self.shutdowns[cloud_node_id]
445 del self.sizes_booting_shutdown[cloud_node_id]
448 self._logger.info("Shutting down after signal.")
449 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
450 setup_stops = {key: node.stop_if_no_cloud_node()
451 for key, node in self.booting.iteritems()}
452 self.booting = {key: self.booting[key]
453 for key in setup_stops if not setup_stops[key].get()}
454 self._later.await_shutdown()
456 def await_shutdown(self):
458 self._timer.schedule(time.time() + 1, self._later.await_shutdown)