3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._later = self.actor_ref.proxy()
125 self.shutdown_windows = shutdown_windows
126 self.server_calculator = server_calculator
127 self.min_cloud_size = self.server_calculator.cheapest_size()
128 self.min_nodes = min_nodes
129 self.max_nodes = max_nodes
130 self.max_total_price = max_total_price
131 self.poll_stale_after = poll_stale_after
132 self.boot_fail_after = boot_fail_after
133 self.node_stale_after = node_stale_after
135 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136 poll_actor = locals()[poll_name + '_actor']
137 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139 self.last_polls[poll_name] = -self.poll_stale_after
140 self.cloud_nodes = _CloudNodeTracker()
141 self.arvados_nodes = _ArvadosNodeTracker()
142 self.booting = {} # Actor IDs to ComputeNodeSetupActors
143 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
144 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
147 self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
148 self._logger.debug("Daemon started")
150 def _update_poll_time(self, poll_key):
151 self.last_polls[poll_key] = time.time()
153 def _pair_nodes(self, node_record, arvados_node):
154 self._logger.info("Cloud node %s is now paired with Arvados node %s",
155 node_record.cloud_node.name, arvados_node['uuid'])
156 self._arvados_nodes_actor.subscribe_to(
157 arvados_node['uuid'], node_record.actor.update_arvados_node)
158 node_record.arvados_node = arvados_node
159 self.arvados_nodes.add(node_record)
161 def _new_node(self, cloud_node):
162 start_time = self._cloud_driver.node_start_time(cloud_node)
163 shutdown_timer = cnode.ShutdownTimer(start_time,
164 self.shutdown_windows)
165 actor = self._node_actor.start(
166 cloud_node=cloud_node,
167 cloud_node_start_time=start_time,
168 shutdown_timer=shutdown_timer,
169 cloud_fqdn_func=self._cloud_driver.node_fqdn,
170 update_actor=self._cloud_updater,
171 timer_actor=self._timer,
173 poll_stale_after=self.poll_stale_after,
174 node_stale_after=self.node_stale_after,
175 cloud_client=self._cloud_driver,
176 boot_fail_after=self.boot_fail_after).proxy()
177 actor.subscribe(self._later.node_can_shutdown)
178 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
179 actor.update_cloud_node)
180 record = _ComputeNodeRecord(actor, cloud_node)
183 def update_cloud_nodes(self, nodelist):
184 self._update_poll_time('cloud_nodes')
185 for key, node in self.cloud_nodes.update_from(nodelist):
186 self._logger.info("Registering new cloud node %s", key)
187 if key in self.booted:
188 record = self.booted.pop(key)
190 record = self._new_node(node)
191 self.cloud_nodes.add(record)
192 for arv_rec in self.arvados_nodes.unpaired():
193 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
194 self._pair_nodes(record, arv_rec.arvados_node)
196 for key, record in self.cloud_nodes.orphans.iteritems():
197 if key in self.shutdowns:
199 self.shutdowns[key].stop().get()
200 except pykka.ActorDeadError:
202 del self.shutdowns[key]
204 record.cloud_node = None
206 def update_arvados_nodes(self, nodelist):
207 self._update_poll_time('arvados_nodes')
208 for key, node in self.arvados_nodes.update_from(nodelist):
209 self._logger.info("Registering new Arvados node %s", key)
210 record = _ComputeNodeRecord(arvados_node=node)
211 self.arvados_nodes.add(record)
212 for arv_rec in self.arvados_nodes.unpaired():
213 arv_node = arv_rec.arvados_node
214 for cloud_rec in self.cloud_nodes.unpaired():
215 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
216 self._pair_nodes(cloud_rec, arv_node)
219 def _nodes_booting(self, size):
221 for c in self.booting.itervalues()
222 if size is None or c.cloud_size.get().id == size.id)
224 for c in self.booted.itervalues()
225 if size is None or c.cloud_node.size.id == size.id)
228 def _nodes_unpaired(self, size):
230 for c in self.cloud_nodes.unpaired()
231 if size is None or c.cloud_node.size.id == size.id)
233 def _nodes_booted(self, size):
235 for c in self.cloud_nodes.nodes.itervalues()
236 if size is None or c.cloud_node.size.id == size.id)
238 def _nodes_up(self, size):
239 up = self._nodes_booting(size) + self._nodes_booted(size)
242 def _total_price(self):
244 cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
245 for c in self.booting.itervalues())
246 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
247 for i in (self.booted, self.cloud_nodes.nodes)
248 for c in i.itervalues())
251 def _nodes_busy(self, size):
252 return sum(1 for busy in
253 pykka.get_all(rec.actor.in_state('busy') for rec in
254 self.cloud_nodes.nodes.itervalues()
255 if rec.cloud_node.size.id == size.id)
258 def _nodes_missing(self, size):
259 return sum(1 for arv_node in
260 pykka.get_all(rec.actor.arvados_node for rec in
261 self.cloud_nodes.nodes.itervalues()
262 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
263 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
265 def _size_wishlist(self, size):
266 return sum(1 for c in self.last_wishlist if c.id == size.id)
268 def _size_shutdowns(self, size):
270 for c in self.shutdowns.itervalues():
272 if c.cloud_node.get().size.id == size.id:
274 except pykka.ActorDeadError:
278 def _nodes_wanted(self, size):
279 total_up_count = self._nodes_up(None)
280 under_min = self.min_nodes - total_up_count
281 over_max = total_up_count - self.max_nodes
282 total_price = self._total_price()
286 elif under_min > 0 and size.id == self.min_cloud_size.id:
289 booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
290 shutdown_count = self._size_shutdowns(size)
291 busy_count = self._nodes_busy(size)
292 up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
294 self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
295 self._size_wishlist(size),
296 up_count + busy_count,
298 up_count - booting_count,
302 wanted = self._size_wishlist(size) - up_count
303 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
304 can_boot = int((self.max_total_price - total_price) / size.price)
306 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
307 size.name, size.price, self.max_total_price, total_price)
312 def _nodes_excess(self, size):
313 up_count = self._nodes_up(size) - self._size_shutdowns(size)
314 if size.id == self.min_cloud_size.id:
315 up_count -= self.min_nodes
316 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
318 def update_server_wishlist(self, wishlist):
319 self._update_poll_time('server_wishlist')
320 self.last_wishlist = wishlist
321 for size in reversed(self.server_calculator.cloud_sizes):
323 nodes_wanted = self._nodes_wanted(size)
325 self._later.start_node(size)
326 elif (nodes_wanted < 0) and self.booting:
327 self._later.stop_booting_node(size)
328 except Exception as e:
329 self._logger.exception("while calculating nodes wanted for size %s", size)
331 def _check_poll_freshness(orig_func):
332 """Decorator to inhibit a method when poll information is stale.
334 This decorator checks the timestamps of all the poll information the
335 daemon has received. The decorated method is only called if none
336 of the timestamps are considered stale.
338 @functools.wraps(orig_func)
339 def wrapper(self, *args, **kwargs):
341 if all(now - t < self.poll_stale_after
342 for t in self.last_polls.itervalues()):
343 return orig_func(self, *args, **kwargs)
348 @_check_poll_freshness
349 def start_node(self, cloud_size):
350 nodes_wanted = self._nodes_wanted(cloud_size)
353 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
354 self._logger.info("Want %i more %s nodes. Booting a node.",
355 nodes_wanted, cloud_size.name)
356 new_setup = self._node_setup.start(
357 timer_actor=self._timer,
358 arvados_client=self._new_arvados(),
359 arvados_node=arvados_node,
360 cloud_client=self._new_cloud(),
361 cloud_size=cloud_size).proxy()
362 self.booting[new_setup.actor_ref.actor_urn] = new_setup
363 if arvados_node is not None:
364 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
366 new_setup.subscribe(self._later.node_up)
368 self._later.start_node(cloud_size)
370 def _get_actor_attrs(self, actor, *attr_names):
371 return pykka.get_all([getattr(actor, name) for name in attr_names])
373 def node_up(self, setup_proxy):
374 cloud_node = setup_proxy.cloud_node.get()
375 del self.booting[setup_proxy.actor_ref.actor_urn]
377 if cloud_node is not None:
378 record = self.cloud_nodes.get(cloud_node.id)
380 record = self._new_node(cloud_node)
381 self.booted[cloud_node.id] = record
382 self._timer.schedule(time.time() + self.boot_fail_after,
383 self._later.shutdown_unpaired_node, cloud_node.id)
385 @_check_poll_freshness
386 def stop_booting_node(self, size):
387 nodes_excess = self._nodes_excess(size)
388 if (nodes_excess < 1) or not self.booting:
390 for key, node in self.booting.iteritems():
391 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
392 del self.booting[key]
394 self._later.stop_booting_node(size)
397 def _begin_node_shutdown(self, node_actor, cancellable):
398 cloud_node_id = node_actor.cloud_node.get().id
399 if cloud_node_id in self.shutdowns:
401 shutdown = self._node_shutdown.start(
402 timer_actor=self._timer, cloud_client=self._new_cloud(),
403 arvados_client=self._new_arvados(),
404 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
405 self.shutdowns[cloud_node_id] = shutdown
406 shutdown.subscribe(self._later.node_finished_shutdown)
408 @_check_poll_freshness
409 def node_can_shutdown(self, node_actor):
410 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
411 self._begin_node_shutdown(node_actor, cancellable=True)
413 def shutdown_unpaired_node(self, cloud_node_id):
414 for record_dict in [self.cloud_nodes, self.booted]:
415 if cloud_node_id in record_dict:
416 record = record_dict[cloud_node_id]
420 if not record.actor.in_state('idle', 'busy').get():
421 self._begin_node_shutdown(record.actor, cancellable=False)
423 def node_finished_shutdown(self, shutdown_actor):
424 cloud_node, success, cancel_reason = self._get_actor_attrs(
425 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
426 shutdown_actor.stop()
427 cloud_node_id = cloud_node.id
429 if cancel_reason == self._node_shutdown.NODE_BROKEN:
430 self.cloud_nodes.blacklist(cloud_node_id)
431 del self.shutdowns[cloud_node_id]
432 elif cloud_node_id in self.booted:
433 self.booted.pop(cloud_node_id).actor.stop()
434 del self.shutdowns[cloud_node_id]
437 self._logger.info("Shutting down after signal.")
438 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
439 setup_stops = {key: node.stop_if_no_cloud_node()
440 for key, node in self.booting.iteritems()}
441 self.booting = {key: self.booting[key]
442 for key in setup_stops if not setup_stops[key].get()}
443 self._later.await_shutdown()
445 def await_shutdown(self):
447 self._timer.schedule(time.time() + 1, self._later.await_shutdown)