3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._logger = logging.getLogger('arvnodeman.daemon')
125 self._later = self.actor_ref.proxy()
126 self.shutdown_windows = shutdown_windows
127 self.server_calculator = server_calculator
128 self.min_cloud_size = self.server_calculator.cheapest_size()
129 self.min_nodes = min_nodes
130 self.max_nodes = max_nodes
131 self.max_total_price = max_total_price
132 self.poll_stale_after = poll_stale_after
133 self.boot_fail_after = boot_fail_after
134 self.node_stale_after = node_stale_after
136 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137 poll_actor = locals()[poll_name + '_actor']
138 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140 self.last_polls[poll_name] = -self.poll_stale_after
141 self.cloud_nodes = _CloudNodeTracker()
142 self.arvados_nodes = _ArvadosNodeTracker()
143 self.booting = {} # Actor IDs to ComputeNodeSetupActors
144 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
145 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
146 self._logger.debug("Daemon initialized")
148 def _update_poll_time(self, poll_key):
149 self.last_polls[poll_key] = time.time()
151 def _pair_nodes(self, node_record, arvados_node):
152 self._logger.info("Cloud node %s has associated with Arvados node %s",
153 node_record.cloud_node.id, arvados_node['uuid'])
154 self._arvados_nodes_actor.subscribe_to(
155 arvados_node['uuid'], node_record.actor.update_arvados_node)
156 node_record.arvados_node = arvados_node
157 self.arvados_nodes.add(node_record)
159 def _new_node(self, cloud_node):
160 start_time = self._cloud_driver.node_start_time(cloud_node)
161 shutdown_timer = cnode.ShutdownTimer(start_time,
162 self.shutdown_windows)
163 actor = self._node_actor.start(
164 cloud_node=cloud_node,
165 cloud_node_start_time=start_time,
166 shutdown_timer=shutdown_timer,
167 cloud_fqdn_func=self._cloud_driver.node_fqdn,
168 update_actor=self._cloud_updater,
169 timer_actor=self._timer,
171 poll_stale_after=self.poll_stale_after,
172 node_stale_after=self.node_stale_after,
173 cloud_client=self._cloud_driver,
174 boot_fail_after=self.boot_fail_after).proxy()
175 actor.subscribe(self._later.node_can_shutdown)
176 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177 actor.update_cloud_node)
178 record = _ComputeNodeRecord(actor, cloud_node)
181 def update_cloud_nodes(self, nodelist):
182 self._update_poll_time('cloud_nodes')
183 for key, node in self.cloud_nodes.update_from(nodelist):
184 self._logger.info("Registering new cloud node %s", key)
185 if key in self.booted:
186 record = self.booted.pop(key)
188 record = self._new_node(node)
189 self.cloud_nodes.add(record)
190 for arv_rec in self.arvados_nodes.unpaired():
191 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192 self._pair_nodes(record, arv_rec.arvados_node)
194 for key, record in self.cloud_nodes.orphans.iteritems():
195 if key in self.shutdowns:
197 self.shutdowns[key].stop().get()
198 except pykka.ActorDeadError:
200 del self.shutdowns[key]
202 record.cloud_node = None
204 def update_arvados_nodes(self, nodelist):
205 self._update_poll_time('arvados_nodes')
206 for key, node in self.arvados_nodes.update_from(nodelist):
207 self._logger.info("Registering new Arvados node %s", key)
208 record = _ComputeNodeRecord(arvados_node=node)
209 self.arvados_nodes.add(record)
210 for arv_rec in self.arvados_nodes.unpaired():
211 arv_node = arv_rec.arvados_node
212 for cloud_rec in self.cloud_nodes.unpaired():
213 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214 self._pair_nodes(cloud_rec, arv_node)
217 def _nodes_up(self, size):
220 for c in self.booting.itervalues()
221 if size is None or c.cloud_size.get().id == size.id)
223 for i in (self.booted, self.cloud_nodes.nodes)
224 for c in i.itervalues()
225 if size is None or c.cloud_node.size.id == size.id)
228 def _total_price(self):
230 cost += sum(c.cloud_size.get().price
231 for c in self.booting.itervalues())
232 cost += sum(c.cloud_node.size.price
233 for i in (self.booted, self.cloud_nodes.nodes)
234 for c in i.itervalues())
237 def _nodes_busy(self, size):
238 return sum(1 for busy in
239 pykka.get_all(rec.actor.in_state('busy') for rec in
240 self.cloud_nodes.nodes.itervalues()
241 if rec.cloud_node.size.id == size.id)
244 def _nodes_missing(self, size):
245 return sum(1 for arv_node in
246 pykka.get_all(rec.actor.arvados_node for rec in
247 self.cloud_nodes.nodes.itervalues()
248 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
249 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
251 def _size_wishlist(self, size):
252 return sum(1 for c in self.last_wishlist if c.id == size.id)
254 def _size_shutdowns(self, size):
255 return sum(1 for c in self.shutdowns.itervalues()
256 if c.cloud_node.get().size.id == size.id)
258 def _nodes_wanted(self, size):
259 total_up_count = self._nodes_up(None)
260 under_min = self.min_nodes - total_up_count
261 over_max = total_up_count - self.max_nodes
262 total_price = self._total_price()
266 elif under_min > 0 and size.id == self.min_cloud_size.id:
269 up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
270 self._nodes_busy(size) +
271 self._nodes_missing(size))
273 wanted = self._size_wishlist(size) - up_count
274 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
275 can_boot = int((self.max_total_price - total_price) / size.price)
277 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
278 size.name, size.price, self.max_total_price, total_price)
283 def _nodes_excess(self, size):
284 up_count = self._nodes_up(size) - self._size_shutdowns(size)
285 if size.id == self.min_cloud_size.id:
286 up_count -= self.min_nodes
287 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
289 def update_server_wishlist(self, wishlist):
290 self._update_poll_time('server_wishlist')
291 self.last_wishlist = wishlist
292 for sz in reversed(self.server_calculator.cloud_sizes):
294 nodes_wanted = self._nodes_wanted(size)
296 self._later.start_node(size)
297 elif (nodes_wanted < 0) and self.booting:
298 self._later.stop_booting_node(size)
300 def _check_poll_freshness(orig_func):
301 """Decorator to inhibit a method when poll information is stale.
303 This decorator checks the timestamps of all the poll information the
304 daemon has received. The decorated method is only called if none
305 of the timestamps are considered stale.
307 @functools.wraps(orig_func)
308 def wrapper(self, *args, **kwargs):
310 if all(now - t < self.poll_stale_after
311 for t in self.last_polls.itervalues()):
312 return orig_func(self, *args, **kwargs)
317 @_check_poll_freshness
318 def start_node(self, cloud_size):
319 nodes_wanted = self._nodes_wanted(cloud_size)
322 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
323 self._logger.info("Want %s more nodes. Booting a %s node.",
324 nodes_wanted, cloud_size.name)
325 new_setup = self._node_setup.start(
326 timer_actor=self._timer,
327 arvados_client=self._new_arvados(),
328 arvados_node=arvados_node,
329 cloud_client=self._new_cloud(),
330 cloud_size=cloud_size).proxy()
331 self.booting[new_setup.actor_ref.actor_urn] = new_setup
332 if arvados_node is not None:
333 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
335 new_setup.subscribe(self._later.node_up)
337 self._later.start_node(cloud_size)
339 def _get_actor_attrs(self, actor, *attr_names):
340 return pykka.get_all([getattr(actor, name) for name in attr_names])
342 def node_up(self, setup_proxy):
343 cloud_node = setup_proxy.cloud_node.get()
344 del self.booting[setup_proxy.actor_ref.actor_urn]
346 record = self.cloud_nodes.get(cloud_node.id)
348 record = self._new_node(cloud_node)
349 self.booted[cloud_node.id] = record
350 self._timer.schedule(time.time() + self.boot_fail_after,
351 self._later.shutdown_unpaired_node, cloud_node.id)
353 @_check_poll_freshness
354 def stop_booting_node(self, size):
355 nodes_excess = self._nodes_excess(size)
356 if (nodes_excess < 1) or not self.booting:
358 for key, node in self.booting.iteritems():
359 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
360 del self.booting[key]
362 self._later.stop_booting_node(size)
365 def _begin_node_shutdown(self, node_actor, cancellable):
366 cloud_node_id = node_actor.cloud_node.get().id
367 if cloud_node_id in self.shutdowns:
369 shutdown = self._node_shutdown.start(
370 timer_actor=self._timer, cloud_client=self._new_cloud(),
371 arvados_client=self._new_arvados(),
372 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
373 self.shutdowns[cloud_node_id] = shutdown
374 shutdown.subscribe(self._later.node_finished_shutdown)
376 @_check_poll_freshness
377 def node_can_shutdown(self, node_actor):
378 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
379 self._begin_node_shutdown(node_actor, cancellable=True)
381 def shutdown_unpaired_node(self, cloud_node_id):
382 for record_dict in [self.cloud_nodes, self.booted]:
383 if cloud_node_id in record_dict:
384 record = record_dict[cloud_node_id]
388 if not record.actor.in_state('idle', 'busy').get():
389 self._begin_node_shutdown(record.actor, cancellable=False)
391 def node_finished_shutdown(self, shutdown_actor):
392 cloud_node, success, cancel_reason = self._get_actor_attrs(
393 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
394 shutdown_actor.stop()
395 cloud_node_id = cloud_node.id
397 if cancel_reason == self._node_shutdown.NODE_BROKEN:
398 self.cloud_nodes.blacklist(cloud_node_id)
399 del self.shutdowns[cloud_node_id]
400 elif cloud_node_id in self.booted:
401 self.booted.pop(cloud_node_id).actor.stop()
402 del self.shutdowns[cloud_node_id]
405 self._logger.info("Shutting down after signal.")
406 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
407 setup_stops = {key: node.stop_if_no_cloud_node()
408 for key, node in self.booting.iteritems()}
409 self.booting = {key: self.booting[key]
410 for key in setup_stops if not setup_stops[key].get()}
411 self._later.await_shutdown()
413 def await_shutdown(self):
415 self._timer.schedule(time.time() + 1, self._later.await_shutdown)