3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._logger = logging.getLogger('arvnodeman.daemon')
125 self._later = self.actor_ref.proxy()
126 self.shutdown_windows = shutdown_windows
127 self.server_calculator = server_calculator
128 self.min_cloud_size = self.server_calculator.cheapest_size()
129 self.min_nodes = min_nodes
130 self.max_nodes = max_nodes
131 self.max_total_price = max_total_price
132 self.poll_stale_after = poll_stale_after
133 self.boot_fail_after = boot_fail_after
134 self.node_stale_after = node_stale_after
136 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137 poll_actor = locals()[poll_name + '_actor']
138 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140 self.last_polls[poll_name] = -self.poll_stale_after
141 self.cloud_nodes = _CloudNodeTracker()
142 self.arvados_nodes = _ArvadosNodeTracker()
143 self.booting = {} # Actor IDs to ComputeNodeSetupActors
144 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
145 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
146 self._logger.debug("Daemon initialized")
148 def _update_poll_time(self, poll_key):
149 self.last_polls[poll_key] = time.time()
151 def _pair_nodes(self, node_record, arvados_node):
152 self._logger.info("Cloud node %s has associated with Arvados node %s",
153 node_record.cloud_node.id, arvados_node['uuid'])
154 self._arvados_nodes_actor.subscribe_to(
155 arvados_node['uuid'], node_record.actor.update_arvados_node)
156 node_record.arvados_node = arvados_node
157 self.arvados_nodes.add(node_record)
159 def _new_node(self, cloud_node):
160 start_time = self._cloud_driver.node_start_time(cloud_node)
161 shutdown_timer = cnode.ShutdownTimer(start_time,
162 self.shutdown_windows)
163 actor = self._node_actor.start(
164 cloud_node=cloud_node,
165 cloud_node_start_time=start_time,
166 shutdown_timer=shutdown_timer,
167 cloud_fqdn_func=self._cloud_driver.node_fqdn,
168 update_actor=self._cloud_updater,
169 timer_actor=self._timer,
171 poll_stale_after=self.poll_stale_after,
172 node_stale_after=self.node_stale_after,
173 cloud_client=self._cloud_driver,
174 boot_fail_after=self.boot_fail_after).proxy()
175 actor.subscribe(self._later.node_can_shutdown)
176 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177 actor.update_cloud_node)
178 record = _ComputeNodeRecord(actor, cloud_node)
181 def update_cloud_nodes(self, nodelist):
182 self._update_poll_time('cloud_nodes')
183 for key, node in self.cloud_nodes.update_from(nodelist):
184 self._logger.info("Registering new cloud node %s", key)
185 if key in self.booted:
186 record = self.booted.pop(key)
188 record = self._new_node(node)
189 self.cloud_nodes.add(record)
190 for arv_rec in self.arvados_nodes.unpaired():
191 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192 self._pair_nodes(record, arv_rec.arvados_node)
194 for key, record in self.cloud_nodes.orphans.iteritems():
195 if key in self.shutdowns:
197 self.shutdowns[key].stop().get()
198 except pykka.ActorDeadError:
200 del self.shutdowns[key]
202 record.cloud_node = None
204 def update_arvados_nodes(self, nodelist):
205 self._update_poll_time('arvados_nodes')
206 for key, node in self.arvados_nodes.update_from(nodelist):
207 self._logger.info("Registering new Arvados node %s", key)
208 record = _ComputeNodeRecord(arvados_node=node)
209 self.arvados_nodes.add(record)
210 for arv_rec in self.arvados_nodes.unpaired():
211 arv_node = arv_rec.arvados_node
212 for cloud_rec in self.cloud_nodes.unpaired():
213 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214 self._pair_nodes(cloud_rec, arv_node)
217 def _nodes_up(self, size):
220 for c in self.booting.itervalues()
221 if size is None or c.cloud_size.get().id == size.id)
223 for i in (self.booted, self.cloud_nodes.nodes)
224 for c in i.itervalues()
225 if size is None or c.cloud_node.size.id == size.id)
228 def _total_price(self):
230 cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
231 for c in self.booting.itervalues())
232 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
233 for i in (self.booted, self.cloud_nodes.nodes)
234 for c in i.itervalues())
237 def _nodes_busy(self, size):
238 return sum(1 for busy in
239 pykka.get_all(rec.actor.in_state('busy') for rec in
240 self.cloud_nodes.nodes.itervalues()
241 if rec.cloud_node.size.id == size.id)
244 def _nodes_missing(self, size):
245 return sum(1 for arv_node in
246 pykka.get_all(rec.actor.arvados_node for rec in
247 self.cloud_nodes.nodes.itervalues()
248 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
249 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
251 def _size_wishlist(self, size):
252 return sum(1 for c in self.last_wishlist if c.id == size.id)
254 def _size_shutdowns(self, size):
256 for c in self.shutdowns.itervalues():
258 if c.cloud_node.get().size.id == size.id:
260 except pykka.ActorDeadError:
264 def _nodes_wanted(self, size):
265 total_up_count = self._nodes_up(None)
266 under_min = self.min_nodes - total_up_count
267 over_max = total_up_count - self.max_nodes
268 total_price = self._total_price()
272 elif under_min > 0 and size.id == self.min_cloud_size.id:
275 up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
276 self._nodes_busy(size) +
277 self._nodes_missing(size))
279 wanted = self._size_wishlist(size) - up_count
280 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
281 can_boot = int((self.max_total_price - total_price) / size.price)
283 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
284 size.name, size.price, self.max_total_price, total_price)
289 def _nodes_excess(self, size):
290 up_count = self._nodes_up(size) - self._size_shutdowns(size)
291 if size.id == self.min_cloud_size.id:
292 up_count -= self.min_nodes
293 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
295 def update_server_wishlist(self, wishlist):
296 self._update_poll_time('server_wishlist')
297 self.last_wishlist = wishlist
298 for size in reversed(self.server_calculator.cloud_sizes):
299 nodes_wanted = self._nodes_wanted(size)
301 self._later.start_node(size)
302 elif (nodes_wanted < 0) and self.booting:
303 self._later.stop_booting_node(size)
305 def _check_poll_freshness(orig_func):
306 """Decorator to inhibit a method when poll information is stale.
308 This decorator checks the timestamps of all the poll information the
309 daemon has received. The decorated method is only called if none
310 of the timestamps are considered stale.
312 @functools.wraps(orig_func)
313 def wrapper(self, *args, **kwargs):
315 if all(now - t < self.poll_stale_after
316 for t in self.last_polls.itervalues()):
317 return orig_func(self, *args, **kwargs)
322 @_check_poll_freshness
323 def start_node(self, cloud_size):
324 nodes_wanted = self._nodes_wanted(cloud_size)
327 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
328 self._logger.info("Want %s more nodes. Booting a %s node.",
329 nodes_wanted, cloud_size.name)
330 new_setup = self._node_setup.start(
331 timer_actor=self._timer,
332 arvados_client=self._new_arvados(),
333 arvados_node=arvados_node,
334 cloud_client=self._new_cloud(),
335 cloud_size=cloud_size).proxy()
336 self.booting[new_setup.actor_ref.actor_urn] = new_setup
337 if arvados_node is not None:
338 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
340 new_setup.subscribe(self._later.node_up)
342 self._later.start_node(cloud_size)
344 def _get_actor_attrs(self, actor, *attr_names):
345 return pykka.get_all([getattr(actor, name) for name in attr_names])
347 def node_up(self, setup_proxy):
348 cloud_node = setup_proxy.cloud_node.get()
349 del self.booting[setup_proxy.actor_ref.actor_urn]
351 record = self.cloud_nodes.get(cloud_node.id)
353 record = self._new_node(cloud_node)
354 self.booted[cloud_node.id] = record
355 self._timer.schedule(time.time() + self.boot_fail_after,
356 self._later.shutdown_unpaired_node, cloud_node.id)
358 @_check_poll_freshness
359 def stop_booting_node(self, size):
360 nodes_excess = self._nodes_excess(size)
361 if (nodes_excess < 1) or not self.booting:
363 for key, node in self.booting.iteritems():
364 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
365 del self.booting[key]
367 self._later.stop_booting_node(size)
370 def _begin_node_shutdown(self, node_actor, cancellable):
371 cloud_node_id = node_actor.cloud_node.get().id
372 if cloud_node_id in self.shutdowns:
374 shutdown = self._node_shutdown.start(
375 timer_actor=self._timer, cloud_client=self._new_cloud(),
376 arvados_client=self._new_arvados(),
377 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
378 self.shutdowns[cloud_node_id] = shutdown
379 shutdown.subscribe(self._later.node_finished_shutdown)
381 @_check_poll_freshness
382 def node_can_shutdown(self, node_actor):
383 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
384 self._begin_node_shutdown(node_actor, cancellable=True)
386 def shutdown_unpaired_node(self, cloud_node_id):
387 for record_dict in [self.cloud_nodes, self.booted]:
388 if cloud_node_id in record_dict:
389 record = record_dict[cloud_node_id]
393 if not record.actor.in_state('idle', 'busy').get():
394 self._begin_node_shutdown(record.actor, cancellable=False)
396 def node_finished_shutdown(self, shutdown_actor):
397 cloud_node, success, cancel_reason = self._get_actor_attrs(
398 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
399 shutdown_actor.stop()
400 cloud_node_id = cloud_node.id
402 if cancel_reason == self._node_shutdown.NODE_BROKEN:
403 self.cloud_nodes.blacklist(cloud_node_id)
404 del self.shutdowns[cloud_node_id]
405 elif cloud_node_id in self.booted:
406 self.booted.pop(cloud_node_id).actor.stop()
407 del self.shutdowns[cloud_node_id]
410 self._logger.info("Shutting down after signal.")
411 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
412 setup_stops = {key: node.stop_if_no_cloud_node()
413 for key, node in self.booting.iteritems()}
414 self.booting = {key: self.booting[key]
415 for key in setup_stops if not setup_stops[key].get()}
416 self._later.await_shutdown()
418 def await_shutdown(self):
420 self._timer.schedule(time.time() + 1, self._later.await_shutdown)