3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
28 self._blacklist = set()
30 # Proxy the methods listed below to self.nodes.
31 def _proxy_method(name):
32 method = getattr(dict, name)
33 @functools.wraps(method, ('__name__', '__doc__'))
34 def wrapper(self, *args, **kwargs):
35 return method(self.nodes, *args, **kwargs)
38 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39 locals()[_method_name] = _proxy_method(_method_name)
41 def record_key(self, record):
42 return self.item_key(getattr(record, self.RECORD_ATTR))
44 def add(self, record):
45 self.nodes[self.record_key(record)] = record
47 def blacklist(self, key):
48 self._blacklist.add(key)
50 def update_record(self, key, item):
51 setattr(self.nodes[key], self.RECORD_ATTR, item)
53 def update_from(self, response):
54 unseen = set(self.nodes.iterkeys())
56 key = self.item_key(item)
57 if key in self._blacklist:
61 self.update_record(key, item)
64 self.orphans = {key: self.nodes.pop(key) for key in unseen}
67 return (record for record in self.nodes.itervalues()
68 if getattr(record, self.PAIR_ATTR) is None)
71 class _CloudNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'cloud_node'
73 PAIR_ATTR = 'arvados_node'
74 item_key = staticmethod(lambda cloud_node: cloud_node.id)
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78 RECORD_ATTR = 'arvados_node'
79 PAIR_ATTR = 'cloud_node'
80 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
82 def find_stale_node(self, stale_time):
83 for record in self.nodes.itervalues():
84 node = record.arvados_node
85 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87 not cnode.timestamp_fresh(record.assignment_time,
93 class NodeManagerDaemonActor(actor_class):
94 """Node Manager daemon.
96 This actor subscribes to all information polls about cloud nodes,
97 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
98 for every cloud node, subscribing them to poll updates
99 appropriately. It creates and destroys cloud nodes based on job queue
100 demand, and stops the corresponding ComputeNode actors when their work
103 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104 cloud_nodes_actor, cloud_update_actor, timer_actor,
105 arvados_factory, cloud_factory,
106 shutdown_windows, server_calculator,
107 min_nodes, max_nodes,
108 poll_stale_after=600,
109 boot_fail_after=1800,
110 node_stale_after=7200,
111 node_setup_class=dispatch.ComputeNodeSetupActor,
112 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113 node_actor_class=dispatch.ComputeNodeMonitorActor,
115 super(NodeManagerDaemonActor, self).__init__()
116 self._node_setup = node_setup_class
117 self._node_shutdown = node_shutdown_class
118 self._node_actor = node_actor_class
119 self._cloud_updater = cloud_update_actor
120 self._timer = timer_actor
121 self._new_arvados = arvados_factory
122 self._new_cloud = cloud_factory
123 self._cloud_driver = self._new_cloud()
124 self._logger = logging.getLogger('arvnodeman.daemon')
125 self._later = self.actor_ref.proxy()
126 self.shutdown_windows = shutdown_windows
127 self.server_calculator = server_calculator
128 self.min_cloud_size = self.server_calculator.cheapest_size()
129 self.min_nodes = min_nodes
130 self.max_nodes = max_nodes
131 self.max_total_price = max_total_price
132 self.poll_stale_after = poll_stale_after
133 self.boot_fail_after = boot_fail_after
134 self.node_stale_after = node_stale_after
136 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137 poll_actor = locals()[poll_name + '_actor']
138 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140 self.last_polls[poll_name] = -self.poll_stale_after
141 self.cloud_nodes = _CloudNodeTracker()
142 self.arvados_nodes = _ArvadosNodeTracker()
143 self.booting = {} # Actor IDs to ComputeNodeSetupActors
144 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
145 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
146 self._logger.debug("Daemon initialized")
148 def _update_poll_time(self, poll_key):
149 self.last_polls[poll_key] = time.time()
151 def _pair_nodes(self, node_record, arvados_node):
152 self._logger.info("Cloud node %s has associated with Arvados node %s",
153 node_record.cloud_node.id, arvados_node['uuid'])
154 self._arvados_nodes_actor.subscribe_to(
155 arvados_node['uuid'], node_record.actor.update_arvados_node)
156 node_record.arvados_node = arvados_node
157 self.arvados_nodes.add(node_record)
159 def _new_node(self, cloud_node):
160 start_time = self._cloud_driver.node_start_time(cloud_node)
161 shutdown_timer = cnode.ShutdownTimer(start_time,
162 self.shutdown_windows)
163 actor = self._node_actor.start(
164 cloud_node=cloud_node,
165 cloud_node_start_time=start_time,
166 shutdown_timer=shutdown_timer,
167 cloud_fqdn_func=self._cloud_driver.node_fqdn,
168 update_actor=self._cloud_updater,
169 timer_actor=self._timer,
171 poll_stale_after=self.poll_stale_after,
172 node_stale_after=self.node_stale_after,
173 cloud_client=self._cloud_driver,
174 boot_fail_after=self.boot_fail_after).proxy()
175 actor.subscribe(self._later.node_can_shutdown)
176 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177 actor.update_cloud_node)
178 record = _ComputeNodeRecord(actor, cloud_node)
181 def update_cloud_nodes(self, nodelist):
182 self._update_poll_time('cloud_nodes')
183 for key, node in self.cloud_nodes.update_from(nodelist):
184 self._logger.info("Registering new cloud node %s", key)
185 if key in self.booted:
186 record = self.booted.pop(key)
188 record = self._new_node(node)
189 self.cloud_nodes.add(record)
190 for arv_rec in self.arvados_nodes.unpaired():
191 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192 self._pair_nodes(record, arv_rec.arvados_node)
194 for key, record in self.cloud_nodes.orphans.iteritems():
195 if key in self.shutdowns:
197 self.shutdowns[key].stop().get()
198 except pykka.ActorDeadError:
200 del self.shutdowns[key]
202 record.cloud_node = None
204 def update_arvados_nodes(self, nodelist):
205 self._update_poll_time('arvados_nodes')
206 for key, node in self.arvados_nodes.update_from(nodelist):
207 self._logger.info("Registering new Arvados node %s", key)
208 record = _ComputeNodeRecord(arvados_node=node)
209 self.arvados_nodes.add(record)
210 for arv_rec in self.arvados_nodes.unpaired():
211 arv_node = arv_rec.arvados_node
212 for cloud_rec in self.cloud_nodes.unpaired():
213 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214 self._pair_nodes(cloud_rec, arv_node)
217 def _nodes_up(self, size):
220 for c in self.booting.itervalues()
221 if size is None or c.cloud_size.get().id == size.id)
223 for i in (self.booted, self.cloud_nodes.nodes)
224 for c in i.itervalues()
225 if size is None or c.cloud_node.size.id == size.id)
228 def _total_price(self):
230 cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
231 for c in self.booting.itervalues())
232 cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
233 for i in (self.booted, self.cloud_nodes.nodes)
234 for c in i.itervalues())
237 def _nodes_busy(self, size):
238 return sum(1 for busy in
239 pykka.get_all(rec.actor.in_state('busy') for rec in
240 self.cloud_nodes.nodes.itervalues()
241 if rec.cloud_node.size.id == size.id)
244 def _nodes_missing(self, size):
245 return sum(1 for arv_node in
246 pykka.get_all(rec.actor.arvados_node for rec in
247 self.cloud_nodes.nodes.itervalues()
248 if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
249 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
251 def _size_wishlist(self, size):
252 return sum(1 for c in self.last_wishlist if c.id == size.id)
254 def _size_shutdowns(self, size):
256 for c in self.shutdowns.itervalues():
258 if c.cloud_node.get().size.id == size.id:
260 except pykka.ActorDeadError:
264 def _nodes_wanted(self, size):
265 total_up_count = self._nodes_up(None)
266 under_min = self.min_nodes - total_up_count
267 over_max = total_up_count - self.max_nodes
268 total_price = self._total_price()
272 elif under_min > 0 and size.id == self.min_cloud_size.id:
275 up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
276 self._nodes_busy(size) +
277 self._nodes_missing(size))
279 self._logger.debug("%s: idle nodes %i, wishlist size %i", size.name, up_count, self._size_wishlist(size))
281 wanted = self._size_wishlist(size) - up_count
282 if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
283 can_boot = int((self.max_total_price - total_price) / size.price)
285 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
286 size.name, size.price, self.max_total_price, total_price)
291 def _nodes_excess(self, size):
292 up_count = self._nodes_up(size) - self._size_shutdowns(size)
293 if size.id == self.min_cloud_size.id:
294 up_count -= self.min_nodes
295 return up_count - self._nodes_busy(size) - self._size_wishlist(size)
297 def update_server_wishlist(self, wishlist):
298 self._update_poll_time('server_wishlist')
299 self.last_wishlist = wishlist
300 for size in reversed(self.server_calculator.cloud_sizes):
301 nodes_wanted = self._nodes_wanted(size)
303 self._later.start_node(size)
304 elif (nodes_wanted < 0) and self.booting:
305 self._later.stop_booting_node(size)
307 def _check_poll_freshness(orig_func):
308 """Decorator to inhibit a method when poll information is stale.
310 This decorator checks the timestamps of all the poll information the
311 daemon has received. The decorated method is only called if none
312 of the timestamps are considered stale.
314 @functools.wraps(orig_func)
315 def wrapper(self, *args, **kwargs):
317 if all(now - t < self.poll_stale_after
318 for t in self.last_polls.itervalues()):
319 return orig_func(self, *args, **kwargs)
324 @_check_poll_freshness
325 def start_node(self, cloud_size):
326 nodes_wanted = self._nodes_wanted(cloud_size)
329 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
330 self._logger.info("Want %s more nodes. Booting a %s node.",
331 nodes_wanted, cloud_size.name)
332 new_setup = self._node_setup.start(
333 timer_actor=self._timer,
334 arvados_client=self._new_arvados(),
335 arvados_node=arvados_node,
336 cloud_client=self._new_cloud(),
337 cloud_size=cloud_size).proxy()
338 self.booting[new_setup.actor_ref.actor_urn] = new_setup
339 if arvados_node is not None:
340 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
342 new_setup.subscribe(self._later.node_up)
344 self._later.start_node(cloud_size)
346 def _get_actor_attrs(self, actor, *attr_names):
347 return pykka.get_all([getattr(actor, name) for name in attr_names])
349 def node_up(self, setup_proxy):
350 cloud_node = setup_proxy.cloud_node.get()
351 del self.booting[setup_proxy.actor_ref.actor_urn]
353 record = self.cloud_nodes.get(cloud_node.id)
355 record = self._new_node(cloud_node)
356 self.booted[cloud_node.id] = record
357 self._timer.schedule(time.time() + self.boot_fail_after,
358 self._later.shutdown_unpaired_node, cloud_node.id)
360 @_check_poll_freshness
361 def stop_booting_node(self, size):
362 nodes_excess = self._nodes_excess(size)
363 if (nodes_excess < 1) or not self.booting:
365 for key, node in self.booting.iteritems():
366 if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
367 del self.booting[key]
369 self._later.stop_booting_node(size)
372 def _begin_node_shutdown(self, node_actor, cancellable):
373 cloud_node_id = node_actor.cloud_node.get().id
374 if cloud_node_id in self.shutdowns:
376 shutdown = self._node_shutdown.start(
377 timer_actor=self._timer, cloud_client=self._new_cloud(),
378 arvados_client=self._new_arvados(),
379 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
380 self.shutdowns[cloud_node_id] = shutdown
381 shutdown.subscribe(self._later.node_finished_shutdown)
383 @_check_poll_freshness
384 def node_can_shutdown(self, node_actor):
385 if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
386 self._begin_node_shutdown(node_actor, cancellable=True)
388 def shutdown_unpaired_node(self, cloud_node_id):
389 for record_dict in [self.cloud_nodes, self.booted]:
390 if cloud_node_id in record_dict:
391 record = record_dict[cloud_node_id]
395 if not record.actor.in_state('idle', 'busy').get():
396 self._begin_node_shutdown(record.actor, cancellable=False)
398 def node_finished_shutdown(self, shutdown_actor):
399 cloud_node, success, cancel_reason = self._get_actor_attrs(
400 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
401 shutdown_actor.stop()
402 cloud_node_id = cloud_node.id
404 if cancel_reason == self._node_shutdown.NODE_BROKEN:
405 self.cloud_nodes.blacklist(cloud_node_id)
406 del self.shutdowns[cloud_node_id]
407 elif cloud_node_id in self.booted:
408 self.booted.pop(cloud_node_id).actor.stop()
409 del self.shutdowns[cloud_node_id]
412 self._logger.info("Shutting down after signal.")
413 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
414 setup_stops = {key: node.stop_if_no_cloud_node()
415 for key, node in self.booting.iteritems()}
416 self.booting = {key: self.booting[key]
417 for key in setup_stops if not setup_stops[key].get()}
418 self._later.await_shutdown()
420 def await_shutdown(self):
422 self._timer.schedule(time.time() + 1, self._later.await_shutdown)