3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._later = self.actor_ref.proxy()
125 self.shutdown_windows = shutdown_windows
126 self.server_calculator = server_calculator
127 self.min_cloud_size = self.server_calculator.cheapest_size()
128 self.min_nodes = min_nodes
129 self.max_nodes = max_nodes
130 self.max_total_price = max_total_price
131 self.poll_stale_after = poll_stale_after
132 self.boot_fail_after = boot_fail_after
133 self.node_stale_after = node_stale_after
135 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136 poll_actor = locals()[poll_name + '_actor']
137 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139 self.last_polls[poll_name] = -self.poll_stale_after
140 self.cloud_nodes = _CloudNodeTracker()
141 self.arvados_nodes = _ArvadosNodeTracker()
142 self.booting = {} # Actor IDs to ComputeNodeSetupActors
143 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
144 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
145 self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
148 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149 self._logger.debug("Daemon started")
151 def _update_poll_time(self, poll_key):
152 self.last_polls[poll_key] = time.time()
154 def _pair_nodes(self, node_record, arvados_node):
155 self._logger.info("Cloud node %s is now paired with Arvados node %s",
156 node_record.cloud_node.name, arvados_node['uuid'])
157 self._arvados_nodes_actor.subscribe_to(
158 arvados_node['uuid'], node_record.actor.update_arvados_node)
159 node_record.arvados_node = arvados_node
160 self.arvados_nodes.add(node_record)
162 def _new_node(self, cloud_node):
163 start_time = self._cloud_driver.node_start_time(cloud_node)
164 shutdown_timer = cnode.ShutdownTimer(start_time,
165 self.shutdown_windows)
166 actor = self._node_actor.start(
167 cloud_node=cloud_node,
168 cloud_node_start_time=start_time,
169 shutdown_timer=shutdown_timer,
170 cloud_fqdn_func=self._cloud_driver.node_fqdn,
171 update_actor=self._cloud_updater,
172 timer_actor=self._timer,
174 poll_stale_after=self.poll_stale_after,
175 node_stale_after=self.node_stale_after,
176 cloud_client=self._cloud_driver,
177 boot_fail_after=self.boot_fail_after).proxy()
178 actor.subscribe(self._later.node_can_shutdown)
179 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
180 actor.update_cloud_node)
181 record = _ComputeNodeRecord(actor, cloud_node)
184 def update_cloud_nodes(self, nodelist):
185 self._update_poll_time('cloud_nodes')
186 for key, node in self.cloud_nodes.update_from(nodelist):
187 self._logger.info("Registering new cloud node %s", key)
188 if key in self.booted:
189 record = self.booted.pop(key)
191 record = self._new_node(node)
192 self.cloud_nodes.add(record)
193 for arv_rec in self.arvados_nodes.unpaired():
194 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
195 self._pair_nodes(record, arv_rec.arvados_node)
197 for key, record in self.cloud_nodes.orphans.iteritems():
198 if key in self.shutdowns:
200 self.shutdowns[key].stop().get()
201 except pykka.ActorDeadError:
203 del self.shutdowns[key]
204 del self.sizes_booting_shutdown[key]
206 record.cloud_node = None
208 def update_arvados_nodes(self, nodelist):
209 self._update_poll_time('arvados_nodes')
210 for key, node in self.arvados_nodes.update_from(nodelist):
211 self._logger.info("Registering new Arvados node %s", key)
212 record = _ComputeNodeRecord(arvados_node=node)
213 self.arvados_nodes.add(record)
214 for arv_rec in self.arvados_nodes.unpaired():
215 arv_node = arv_rec.arvados_node
216 for cloud_rec in self.cloud_nodes.unpaired():
217 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
218 self._pair_nodes(cloud_rec, arv_node)
221 def _nodes_booting(self, size):
223 for c in self.booting.iterkeys()
224 if size is None or self.sizes_booting_shutdown[c].id == size.id)
226 for c in self.booted.itervalues()
227 if size is None or c.cloud_node.size.id == size.id)
230 def _nodes_unpaired(self, size):
232 for c in self.cloud_nodes.unpaired()
233 if size is None or c.cloud_node.size.id == size.id)
235 def _nodes_booted(self, size):
237 for c in self.cloud_nodes.nodes.itervalues()
238 if size is None or c.cloud_node.size.id == size.id)
240 def _nodes_up(self, size):
241 up = self._nodes_booting(size) + self._nodes_booted(size)
244 def _total_price(self):
246 cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
247 for c in self.booting.iterkeys())
248 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
249 for i in (self.booted, self.cloud_nodes.nodes)
250 for c in i.itervalues())
253 def _nodes_busy(self, size):
254 return sum(1 for busy in
255 pykka.get_all(rec.actor.in_state('busy') for rec in
256 self.cloud_nodes.nodes.itervalues()
257 if rec.cloud_node.size.id == size.id)
260 def _nodes_missing(self, size):
261 return sum(1 for arv_node in
262 pykka.get_all(rec.actor.arvados_node for rec in
263 self.cloud_nodes.nodes.itervalues()
264 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
265 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
267 def _size_wishlist(self, size):
268 return sum(1 for c in self.last_wishlist if c.id == size.id)
270 def _size_shutdowns(self, size):
272 for c in self.shutdowns.iterkeys():
274 if self.sizes_booting_shutdown[c].id == size.id:
276 except pykka.ActorDeadError:
280 def _nodes_wanted(self, size):
281 total_up_count = self._nodes_up(None)
282 under_min = self.min_nodes - total_up_count
283 over_max = total_up_count - self.max_nodes
284 total_price = self._total_price()
288 elif under_min > 0 and size.id == self.min_cloud_size.id:
291 booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
292 shutdown_count = self._size_shutdowns(size)
293 busy_count = self._nodes_busy(size)
294 up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
296 self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
297 self._size_wishlist(size),
298 up_count + busy_count,
300 up_count - booting_count,
304 wanted = self._size_wishlist(size) - up_count
305 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
306 can_boot = int((self.max_total_price - total_price) / size.price)
308 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
309 size.name, size.price, self.max_total_price, total_price)
314 def _nodes_excess(self, size):
315 up_count = self._nodes_up(size) - self._size_shutdowns(size)
316 if size.id == self.min_cloud_size.id:
317 up_count -= self.min_nodes
318 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
320 def update_server_wishlist(self, wishlist):
321 self._update_poll_time('server_wishlist')
322 self.last_wishlist = wishlist
323 for size in reversed(self.server_calculator.cloud_sizes):
325 nodes_wanted = self._nodes_wanted(size)
327 self._later.start_node(size)
328 elif (nodes_wanted < 0) and self.booting:
329 self._later.stop_booting_node(size)
330 except Exception as e:
331 self._logger.exception("while calculating nodes wanted for size %s", size)
333 def _check_poll_freshness(orig_func):
334 """Decorator to inhibit a method when poll information is stale.
336 This decorator checks the timestamps of all the poll information the
337 daemon has received. The decorated method is only called if none
338 of the timestamps are considered stale.
340 @functools.wraps(orig_func)
341 def wrapper(self, *args, **kwargs):
343 if all(now - t < self.poll_stale_after
344 for t in self.last_polls.itervalues()):
345 return orig_func(self, *args, **kwargs)
350 @_check_poll_freshness
351 def start_node(self, cloud_size):
352 nodes_wanted = self._nodes_wanted(cloud_size)
355 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
356 self._logger.info("Want %i more %s nodes. Booting a node.",
357 nodes_wanted, cloud_size.name)
358 new_setup = self._node_setup.start(
359 timer_actor=self._timer,
360 arvados_client=self._new_arvados(),
361 arvados_node=arvados_node,
362 cloud_client=self._new_cloud(),
363 cloud_size=cloud_size).proxy()
364 self.booting[new_setup.actor_ref.actor_urn] = new_setup
365 self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
367 if arvados_node is not None:
368 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
370 new_setup.subscribe(self._later.node_up)
372 self._later.start_node(cloud_size)
374 def _get_actor_attrs(self, actor, *attr_names):
375 return pykka.get_all([getattr(actor, name) for name in attr_names])
377 def node_up(self, setup_proxy):
378 cloud_node = setup_proxy.cloud_node.get()
379 del self.booting[setup_proxy.actor_ref.actor_urn]
380 del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
383 if cloud_node is not None:
384 record = self.cloud_nodes.get(cloud_node.id)
386 record = self._new_node(cloud_node)
387 self.booted[cloud_node.id] = record
388 self._timer.schedule(time.time() + self.boot_fail_after,
389 self._later.shutdown_unpaired_node, cloud_node.id)
391 @_check_poll_freshness
392 def stop_booting_node(self, size):
393 nodes_excess = self._nodes_excess(size)
394 if (nodes_excess < 1) or not self.booting:
396 for key, node in self.booting.iteritems():
397 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
398 del self.booting[key]
399 del self.sizes_booting_shutdown[key]
402 self._later.stop_booting_node(size)
405 def _begin_node_shutdown(self, node_actor, cancellable):
406 cloud_node_obj = node_actor.cloud_node.get()
407 cloud_node_id = cloud_node_obj.id
408 if cloud_node_id in self.shutdowns:
410 shutdown = self._node_shutdown.start(
411 timer_actor=self._timer, cloud_client=self._new_cloud(),
412 arvados_client=self._new_arvados(),
413 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
414 self.shutdowns[cloud_node_id] = shutdown
415 self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
416 shutdown.subscribe(self._later.node_finished_shutdown)
418 @_check_poll_freshness
419 def node_can_shutdown(self, node_actor):
420 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
421 self._begin_node_shutdown(node_actor, cancellable=True)
423 def shutdown_unpaired_node(self, cloud_node_id):
424 for record_dict in [self.cloud_nodes, self.booted]:
425 if cloud_node_id in record_dict:
426 record = record_dict[cloud_node_id]
430 if not record.actor.in_state('idle', 'busy').get():
431 self._begin_node_shutdown(record.actor, cancellable=False)
433 def node_finished_shutdown(self, shutdown_actor):
434 cloud_node, success, cancel_reason = self._get_actor_attrs(
435 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
436 shutdown_actor.stop()
437 cloud_node_id = cloud_node.id
439 if cancel_reason == self._node_shutdown.NODE_BROKEN:
440 self.cloud_nodes.blacklist(cloud_node_id)
441 del self.shutdowns[cloud_node_id]
442 del self.sizes_booting_shutdown[cloud_node_id]
443 elif cloud_node_id in self.booted:
444 self.booted.pop(cloud_node_id).actor.stop()
445 del self.shutdowns[cloud_node_id]
446 del self.sizes_booting_shutdown[cloud_node_id]
449 self._logger.info("Shutting down after signal.")
450 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
451 setup_stops = {key: node.stop_if_no_cloud_node()
452 for key, node in self.booting.iteritems()}
453 self.booting = {key: self.booting[key]
454 for key in setup_stops if not setup_stops[key].get()}
455 self._later.await_shutdown()
457 def await_shutdown(self):
459 self._timer.schedule(time.time() + 1, self._later.await_shutdown)