3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._later = self.actor_ref.tell_proxy()
125 self.shutdown_windows = shutdown_windows
126 self.server_calculator = server_calculator
127 self.min_cloud_size = self.server_calculator.cheapest_size()
128 self.min_nodes = min_nodes
129 self.max_nodes = max_nodes
130 self.max_total_price = max_total_price
131 self.poll_stale_after = poll_stale_after
132 self.boot_fail_after = boot_fail_after
133 self.node_stale_after = node_stale_after
135 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136 poll_actor = locals()[poll_name + '_actor']
137 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139 self.last_polls[poll_name] = -self.poll_stale_after
140 self.cloud_nodes = _CloudNodeTracker()
141 self.arvados_nodes = _ArvadosNodeTracker()
142 self.booting = {} # Actor IDs to ComputeNodeSetupActors
143 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
144 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
145 self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
148 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149 self._logger.debug("Daemon started")
151 def _update_poll_time(self, poll_key):
152 self.last_polls[poll_key] = time.time()
154 def _resume_node(self, node_record):
155 node_record.actor.resume_node()
157 def _pair_nodes(self, node_record, arvados_node):
158 self._logger.info("Cloud node %s is now paired with Arvados node %s",
159 node_record.cloud_node.name, arvados_node['uuid'])
160 self._arvados_nodes_actor.subscribe_to(
161 arvados_node['uuid'], node_record.actor.update_arvados_node)
162 node_record.arvados_node = arvados_node
163 self.arvados_nodes.add(node_record)
165 def _new_node(self, cloud_node):
166 start_time = self._cloud_driver.node_start_time(cloud_node)
167 shutdown_timer = cnode.ShutdownTimer(start_time,
168 self.shutdown_windows)
169 actor = self._node_actor.start(
170 cloud_node=cloud_node,
171 cloud_node_start_time=start_time,
172 shutdown_timer=shutdown_timer,
173 cloud_fqdn_func=self._cloud_driver.node_fqdn,
174 update_actor=self._cloud_updater,
175 timer_actor=self._timer,
177 poll_stale_after=self.poll_stale_after,
178 node_stale_after=self.node_stale_after,
179 cloud_client=self._cloud_driver,
180 boot_fail_after=self.boot_fail_after)
181 actorTell = actor.tell_proxy()
182 actorTell.subscribe(self._later.node_can_shutdown)
183 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
184 actorTell.update_cloud_node)
185 record = _ComputeNodeRecord(actor.proxy(), cloud_node)
188 def update_cloud_nodes(self, nodelist):
189 self._update_poll_time('cloud_nodes')
190 for key, node in self.cloud_nodes.update_from(nodelist):
191 self._logger.info("Registering new cloud node %s", key)
192 if key in self.booted:
193 record = self.booted.pop(key)
195 record = self._new_node(node)
196 self.cloud_nodes.add(record)
197 for arv_rec in self.arvados_nodes.unpaired():
198 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
199 self._pair_nodes(record, arv_rec.arvados_node)
201 for key, record in self.cloud_nodes.orphans.iteritems():
202 if key in self.shutdowns:
204 self.shutdowns[key].stop().get()
205 except pykka.ActorDeadError:
207 del self.shutdowns[key]
208 del self.sizes_booting_shutdown[key]
210 record.cloud_node = None
212 def update_arvados_nodes(self, nodelist):
213 self._update_poll_time('arvados_nodes')
214 for key, node in self.arvados_nodes.update_from(nodelist):
215 self._logger.info("Registering new Arvados node %s", key)
216 record = _ComputeNodeRecord(arvados_node=node)
217 self.arvados_nodes.add(record)
218 for arv_rec in self.arvados_nodes.unpaired():
219 arv_node = arv_rec.arvados_node
220 for cloud_rec in self.cloud_nodes.unpaired():
221 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
222 self._pair_nodes(cloud_rec, arv_node)
224 for rec in self.arvados_nodes.nodes.itervalues():
225 if (rec.arvados_node["info"].get("slurm_state") == "down" and
226 rec.cloud_node is not None and
227 rec.cloud_node.id not in self.shutdowns):
228 self._resume_node(rec)
230 def _nodes_booting(self, size):
232 for c in self.booting.iterkeys()
233 if size is None or self.sizes_booting_shutdown[c].id == size.id)
235 for c in self.booted.itervalues()
236 if size is None or c.cloud_node.size.id == size.id)
239 def _nodes_unpaired(self, size):
241 for c in self.cloud_nodes.unpaired()
242 if size is None or c.cloud_node.size.id == size.id)
244 def _nodes_booted(self, size):
246 for c in self.cloud_nodes.nodes.itervalues()
247 if size is None or c.cloud_node.size.id == size.id)
249 def _nodes_down(self, size):
250 return sum(1 for down in
251 pykka.get_all(rec.actor.in_state('down') for rec in
252 self.cloud_nodes.nodes.itervalues()
253 if size is None or rec.cloud_node.size.id == size.id)
256 def _nodes_up(self, size):
257 up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
260 def _total_price(self):
262 cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
263 for c in self.booting.iterkeys())
264 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
265 for i in (self.booted, self.cloud_nodes.nodes)
266 for c in i.itervalues())
269 def _nodes_busy(self, size):
270 return sum(1 for busy in
271 pykka.get_all(rec.actor.in_state('busy') for rec in
272 self.cloud_nodes.nodes.itervalues()
273 if rec.cloud_node.size.id == size.id)
276 def _nodes_missing(self, size):
277 return sum(1 for arv_node in
278 pykka.get_all(rec.actor.arvados_node for rec in
279 self.cloud_nodes.nodes.itervalues()
280 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
281 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
283 def _size_wishlist(self, size):
284 return sum(1 for c in self.last_wishlist if c.id == size.id)
286 def _size_shutdowns(self, size):
288 for c in self.shutdowns.iterkeys():
290 if self.sizes_booting_shutdown[c].id == size.id:
292 except pykka.ActorDeadError:
296 def _nodes_wanted(self, size):
297 total_up_count = self._nodes_up(None)
298 under_min = self.min_nodes - total_up_count
299 over_max = total_up_count - self.max_nodes
300 total_price = self._total_price()
304 elif under_min > 0 and size.id == self.min_cloud_size.id:
307 booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
308 shutdown_count = self._size_shutdowns(size)
309 busy_count = self._nodes_busy(size)
310 up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
312 self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
313 self._size_wishlist(size),
314 up_count + busy_count,
316 up_count - booting_count,
320 wanted = self._size_wishlist(size) - up_count
321 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
322 can_boot = int((self.max_total_price - total_price) / size.price)
324 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
325 size.name, size.price, self.max_total_price, total_price)
330 def _nodes_excess(self, size):
331 up_count = self._nodes_up(size) - self._size_shutdowns(size)
332 if size.id == self.min_cloud_size.id:
333 up_count -= self.min_nodes
334 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
336 def update_server_wishlist(self, wishlist):
337 self._update_poll_time('server_wishlist')
338 self.last_wishlist = wishlist
339 for size in reversed(self.server_calculator.cloud_sizes):
341 nodes_wanted = self._nodes_wanted(size)
343 self._later.start_node(size)
344 elif (nodes_wanted < 0) and self.booting:
345 self._later.stop_booting_node(size)
346 except Exception as e:
347 self._logger.exception("while calculating nodes wanted for size %s", size)
349 def _check_poll_freshness(orig_func):
350 """Decorator to inhibit a method when poll information is stale.
352 This decorator checks the timestamps of all the poll information the
353 daemon has received. The decorated method is only called if none
354 of the timestamps are considered stale.
356 @functools.wraps(orig_func)
357 def wrapper(self, *args, **kwargs):
359 if all(now - t < self.poll_stale_after
360 for t in self.last_polls.itervalues()):
361 return orig_func(self, *args, **kwargs)
366 @_check_poll_freshness
367 def start_node(self, cloud_size):
368 nodes_wanted = self._nodes_wanted(cloud_size)
371 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
372 self._logger.info("Want %i more %s nodes. Booting a node.",
373 nodes_wanted, cloud_size.name)
374 new_setup = self._node_setup.start(
375 timer_actor=self._timer,
376 arvados_client=self._new_arvados(),
377 arvados_node=arvados_node,
378 cloud_client=self._new_cloud(),
379 cloud_size=cloud_size).proxy()
380 self.booting[new_setup.actor_ref.actor_urn] = new_setup
381 self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
383 if arvados_node is not None:
384 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
386 new_setup.subscribe(self._later.node_up)
388 self._later.start_node(cloud_size)
390 def _get_actor_attrs(self, actor, *attr_names):
391 return pykka.get_all([getattr(actor, name) for name in attr_names])
393 def node_up(self, setup_proxy):
394 cloud_node = setup_proxy.cloud_node.get()
395 del self.booting[setup_proxy.actor_ref.actor_urn]
396 del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
399 if cloud_node is not None:
400 record = self.cloud_nodes.get(cloud_node.id)
402 record = self._new_node(cloud_node)
403 self.booted[cloud_node.id] = record
404 self._timer.schedule(time.time() + self.boot_fail_after,
405 self._later.shutdown_unpaired_node, cloud_node.id)
407 @_check_poll_freshness
408 def stop_booting_node(self, size):
409 nodes_excess = self._nodes_excess(size)
410 if (nodes_excess < 1) or not self.booting:
412 for key, node in self.booting.iteritems():
413 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
414 del self.booting[key]
415 del self.sizes_booting_shutdown[key]
418 self._later.stop_booting_node(size)
421 def _begin_node_shutdown(self, node_actor, cancellable):
422 cloud_node_obj = node_actor.cloud_node.get()
423 cloud_node_id = cloud_node_obj.id
424 if cloud_node_id in self.shutdowns:
426 shutdown = self._node_shutdown.start(
427 timer_actor=self._timer, cloud_client=self._new_cloud(),
428 arvados_client=self._new_arvados(),
429 node_monitor=node_actor.actor_ref, cancellable=cancellable)
430 self.shutdowns[cloud_node_id] = shutdown.proxy()
431 self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
432 shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
434 @_check_poll_freshness
435 def node_can_shutdown(self, node_actor):
436 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
437 self._begin_node_shutdown(node_actor, cancellable=True)
439 def shutdown_unpaired_node(self, cloud_node_id):
440 for record_dict in [self.cloud_nodes, self.booted]:
441 if cloud_node_id in record_dict:
442 record = record_dict[cloud_node_id]
446 if not record.actor.in_state('idle', 'busy').get():
447 self._begin_node_shutdown(record.actor, cancellable=False)
449 def node_finished_shutdown(self, shutdown_actor):
450 cloud_node, success, cancel_reason = self._get_actor_attrs(
451 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
452 shutdown_actor.stop()
453 cloud_node_id = cloud_node.id
455 if cancel_reason == self._node_shutdown.NODE_BROKEN:
456 self.cloud_nodes.blacklist(cloud_node_id)
457 elif cloud_node_id in self.booted:
458 self.booted.pop(cloud_node_id).actor.stop()
459 del self.shutdowns[cloud_node_id]
460 del self.sizes_booting_shutdown[cloud_node_id]
463 self._logger.info("Shutting down after signal.")
464 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
465 setup_stops = {key: node.stop_if_no_cloud_node()
466 for key, node in self.booting.iteritems()}
467 self.booting = {key: self.booting[key]
468 for key in setup_stops if not setup_stops[key].get()}
469 self._later.await_shutdown()
471 def await_shutdown(self):
473 self._timer.schedule(time.time() + 1, self._later.await_shutdown)