3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._logger = logging.getLogger('arvnodeman.daemon')
125 self._later = self.actor_ref.proxy()
126 self.shutdown_windows = shutdown_windows
127 self.server_calculator = server_calculator
128 self.min_cloud_size = self.server_calculator.cheapest_size()
129 self.min_nodes = min_nodes
130 self.max_nodes = max_nodes
131 self.max_total_price = max_total_price
132 self.poll_stale_after = poll_stale_after
133 self.boot_fail_after = boot_fail_after
134 self.node_stale_after = node_stale_after
136 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137 poll_actor = locals()[poll_name + '_actor']
138 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140 self.last_polls[poll_name] = -self.poll_stale_after
141 self.cloud_nodes = _CloudNodeTracker()
142 self.arvados_nodes = _ArvadosNodeTracker()
143 self.booting = {} # Actor IDs to ComputeNodeSetupActors
144 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
145 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
146 self._logger.debug("Daemon initialized")
148 def _update_poll_time(self, poll_key):
149 self.last_polls[poll_key] = time.time()
151 def _pair_nodes(self, node_record, arvados_node):
152 self._logger.info("Cloud node %s has associated with Arvados node %s",
153 node_record.cloud_node.id, arvados_node['uuid'])
154 self._arvados_nodes_actor.subscribe_to(
155 arvados_node['uuid'], node_record.actor.update_arvados_node)
156 node_record.arvados_node = arvados_node
157 self.arvados_nodes.add(node_record)
159 def _new_node(self, cloud_node):
160 start_time = self._cloud_driver.node_start_time(cloud_node)
161 shutdown_timer = cnode.ShutdownTimer(start_time,
162 self.shutdown_windows)
163 actor = self._node_actor.start(
164 cloud_node=cloud_node,
165 cloud_node_start_time=start_time,
166 shutdown_timer=shutdown_timer,
167 cloud_fqdn_func=self._cloud_driver.node_fqdn,
168 update_actor=self._cloud_updater,
169 timer_actor=self._timer,
171 poll_stale_after=self.poll_stale_after,
172 node_stale_after=self.node_stale_after,
173 cloud_client=self._cloud_driver,
174 boot_fail_after=self.boot_fail_after).proxy()
175 actor.subscribe(self._later.node_can_shutdown)
176 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177 actor.update_cloud_node)
178 record = _ComputeNodeRecord(actor, cloud_node)
181 def update_cloud_nodes(self, nodelist):
182 self._update_poll_time('cloud_nodes')
183 for key, node in self.cloud_nodes.update_from(nodelist):
184 self._logger.info("Registering new cloud node %s", key)
185 if key in self.booted:
186 record = self.booted.pop(key)
188 record = self._new_node(node)
189 self.cloud_nodes.add(record)
190 for arv_rec in self.arvados_nodes.unpaired():
191 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192 self._pair_nodes(record, arv_rec.arvados_node)
194 for key, record in self.cloud_nodes.orphans.iteritems():
195 if key in self.shutdowns:
197 self.shutdowns[key].stop().get()
198 except pykka.ActorDeadError:
200 del self.shutdowns[key]
202 record.cloud_node = None
204 def update_arvados_nodes(self, nodelist):
205 self._update_poll_time('arvados_nodes')
206 for key, node in self.arvados_nodes.update_from(nodelist):
207 self._logger.info("Registering new Arvados node %s", key)
208 record = _ComputeNodeRecord(arvados_node=node)
209 self.arvados_nodes.add(record)
210 for arv_rec in self.arvados_nodes.unpaired():
211 arv_node = arv_rec.arvados_node
212 for cloud_rec in self.cloud_nodes.unpaired():
213 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214 self._pair_nodes(cloud_rec, arv_node)
217 def _nodes_up(self, size):
220 for c in self.booting.itervalues()
221 if size is None or c.cloud_size.get().id == size.id)
223 for i in (self.booted, self.cloud_nodes.nodes)
224 for c in i.itervalues()
225 if size is None or (c.cloud_node.size and c.cloud_node.size.id == size.id))
228 def _total_price(self):
230 cost += sum(c.cloud_size.get().price
231 for c in self.booting.itervalues())
232 cost += sum(c.cloud_node.size.price
233 for i in (self.booted, self.cloud_nodes.nodes)
234 for c in i.itervalues()
235 if c.cloud_node.size)
238 def _nodes_busy(self, size):
239 return sum(1 for busy in
240 pykka.get_all(rec.actor.in_state('busy') for rec in
241 self.cloud_nodes.nodes.itervalues()
242 if (rec.cloud_node.size and rec.cloud_node.size.id == size.id))
245 def _nodes_missing(self, size):
246 return sum(1 for arv_node in
247 pykka.get_all(rec.actor.arvados_node for rec in
248 self.cloud_nodes.nodes.itervalues()
249 if rec.cloud_node.size and rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
250 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
252 def _size_wishlist(self, size):
253 return sum(1 for c in self.last_wishlist if c.id == size.id)
255 def _size_shutdowns(self, size):
256 return sum(1 for c in self.shutdowns.itervalues()
257 if c.cloud_node.get().size.id == size.id)
259 def _nodes_wanted(self, size):
260 total_up_count = self._nodes_up(None)
261 under_min = self.min_nodes - total_up_count
262 over_max = total_up_count - self.max_nodes
263 total_price = self._total_price()
267 elif under_min > 0 and size.id == self.min_cloud_size.id:
270 up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
271 self._nodes_busy(size) +
272 self._nodes_missing(size))
274 wanted = self._size_wishlist(size) - up_count
275 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
276 can_boot = int((self.max_total_price - total_price) / size.price)
278 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
279 size.name, size.price, self.max_total_price, total_price)
284 def _nodes_excess(self, size):
285 up_count = self._nodes_up(size) - self._size_shutdowns(size)
286 if size.id == self.min_cloud_size.id:
287 up_count -= self.min_nodes
288 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
290 def update_server_wishlist(self, wishlist):
291 self._update_poll_time('server_wishlist')
292 self.last_wishlist = wishlist
293 for sz in reversed(self.server_calculator.cloud_sizes):
295 nodes_wanted = self._nodes_wanted(size)
297 self._later.start_node(size)
298 elif (nodes_wanted < 0) and self.booting:
299 self._later.stop_booting_node(size)
301 def _check_poll_freshness(orig_func):
302 """Decorator to inhibit a method when poll information is stale.
304 This decorator checks the timestamps of all the poll information the
305 daemon has received. The decorated method is only called if none
306 of the timestamps are considered stale.
308 @functools.wraps(orig_func)
309 def wrapper(self, *args, **kwargs):
311 if all(now - t < self.poll_stale_after
312 for t in self.last_polls.itervalues()):
313 return orig_func(self, *args, **kwargs)
318 @_check_poll_freshness
319 def start_node(self, cloud_size):
320 nodes_wanted = self._nodes_wanted(cloud_size)
323 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
324 self._logger.info("Want %s more nodes. Booting a %s node.",
325 nodes_wanted, cloud_size.name)
326 new_setup = self._node_setup.start(
327 timer_actor=self._timer,
328 arvados_client=self._new_arvados(),
329 arvados_node=arvados_node,
330 cloud_client=self._new_cloud(),
331 cloud_size=cloud_size).proxy()
332 self.booting[new_setup.actor_ref.actor_urn] = new_setup
333 if arvados_node is not None:
334 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
336 new_setup.subscribe(self._later.node_up)
338 self._later.start_node(cloud_size)
340 def _get_actor_attrs(self, actor, *attr_names):
341 return pykka.get_all([getattr(actor, name) for name in attr_names])
343 def node_up(self, setup_proxy):
344 cloud_node = setup_proxy.cloud_node.get()
345 del self.booting[setup_proxy.actor_ref.actor_urn]
347 record = self.cloud_nodes.get(cloud_node.id)
349 record = self._new_node(cloud_node)
350 self.booted[cloud_node.id] = record
351 self._timer.schedule(time.time() + self.boot_fail_after,
352 self._later.shutdown_unpaired_node, cloud_node.id)
354 @_check_poll_freshness
355 def stop_booting_node(self, size):
356 nodes_excess = self._nodes_excess(size)
357 if (nodes_excess < 1) or not self.booting:
359 for key, node in self.booting.iteritems():
360 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
361 del self.booting[key]
363 self._later.stop_booting_node(size)
366 def _begin_node_shutdown(self, node_actor, cancellable):
367 cloud_node_id = node_actor.cloud_node.get().id
368 if cloud_node_id in self.shutdowns:
370 shutdown = self._node_shutdown.start(
371 timer_actor=self._timer, cloud_client=self._new_cloud(),
372 arvados_client=self._new_arvados(),
373 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
374 self.shutdowns[cloud_node_id] = shutdown
375 shutdown.subscribe(self._later.node_finished_shutdown)
377 @_check_poll_freshness
378 def node_can_shutdown(self, node_actor):
379 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
380 self._begin_node_shutdown(node_actor, cancellable=True)
382 def shutdown_unpaired_node(self, cloud_node_id):
383 for record_dict in [self.cloud_nodes, self.booted]:
384 if cloud_node_id in record_dict:
385 record = record_dict[cloud_node_id]
389 if not record.actor.in_state('idle', 'busy').get():
390 self._begin_node_shutdown(record.actor, cancellable=False)
392 def node_finished_shutdown(self, shutdown_actor):
393 cloud_node, success, cancel_reason = self._get_actor_attrs(
394 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
395 shutdown_actor.stop()
396 cloud_node_id = cloud_node.id
398 if cancel_reason == self._node_shutdown.NODE_BROKEN:
399 self.cloud_nodes.blacklist(cloud_node_id)
400 del self.shutdowns[cloud_node_id]
401 elif cloud_node_id in self.booted:
402 self.booted.pop(cloud_node_id).actor.stop()
403 del self.shutdowns[cloud_node_id]
406 self._logger.info("Shutting down after signal.")
407 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
408 setup_stops = {key: node.stop_if_no_cloud_node()
409 for key, node in self.booting.iteritems()}
410 self.booting = {key: self.booting[key]
411 for key in setup_stops if not setup_stops[key].get()}
412 self._later.await_shutdown()
414 def await_shutdown(self):
416 self._timer.schedule(time.time() + 1, self._later.await_shutdown)