3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, min_size, min_nodes, max_nodes,
101 poll_stale_after=600,
102 boot_fail_after=1800,
103 node_stale_after=7200,
104 node_setup_class=dispatch.ComputeNodeSetupActor,
105 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106 node_actor_class=dispatch.ComputeNodeMonitorActor):
107 super(NodeManagerDaemonActor, self).__init__()
108 self._node_setup = node_setup_class
109 self._node_shutdown = node_shutdown_class
110 self._node_actor = node_actor_class
111 self._cloud_updater = cloud_update_actor
112 self._timer = timer_actor
113 self._new_arvados = arvados_factory
114 self._new_cloud = cloud_factory
115 self._cloud_driver = self._new_cloud()
116 self._logger = logging.getLogger('arvnodeman.daemon')
117 self._later = self.actor_ref.proxy()
118 self.shutdown_windows = shutdown_windows
119 self.min_cloud_size = min_size
120 self.min_nodes = min_nodes
121 self.max_nodes = max_nodes
122 self.poll_stale_after = poll_stale_after
123 self.boot_fail_after = boot_fail_after
124 self.node_stale_after = node_stale_after
126 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127 poll_actor = locals()[poll_name + '_actor']
128 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130 self.last_polls[poll_name] = -self.poll_stale_after
131 self.cloud_nodes = _CloudNodeTracker()
132 self.arvados_nodes = _ArvadosNodeTracker()
133 self.booting = {} # Actor IDs to ComputeNodeSetupActors
134 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
135 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
136 self._logger.debug("Daemon initialized")
138 def _update_poll_time(self, poll_key):
139 self.last_polls[poll_key] = time.time()
141 def _pair_nodes(self, node_record, arvados_node):
142 self._logger.info("Cloud node %s has associated with Arvados node %s",
143 node_record.cloud_node.id, arvados_node['uuid'])
144 self._arvados_nodes_actor.subscribe_to(
145 arvados_node['uuid'], node_record.actor.update_arvados_node)
146 node_record.arvados_node = arvados_node
147 self.arvados_nodes.add(node_record)
149 def _new_node(self, cloud_node):
150 start_time = self._cloud_driver.node_start_time(cloud_node)
151 shutdown_timer = cnode.ShutdownTimer(start_time,
152 self.shutdown_windows)
153 actor = self._node_actor.start(
154 cloud_node=cloud_node,
155 cloud_node_start_time=start_time,
156 shutdown_timer=shutdown_timer,
157 cloud_fqdn_func=self._cloud_driver.node_fqdn,
158 update_actor=self._cloud_updater,
159 timer_actor=self._timer,
161 poll_stale_after=self.poll_stale_after,
162 node_stale_after=self.node_stale_after,
163 cloud_client=self._cloud_driver,
164 boot_fail_after=self.boot_fail_after).proxy()
165 actor.subscribe(self._later.node_can_shutdown)
166 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
167 actor.update_cloud_node)
168 record = _ComputeNodeRecord(actor, cloud_node)
171 def update_cloud_nodes(self, nodelist):
172 self._update_poll_time('cloud_nodes')
173 for key, node in self.cloud_nodes.update_from(nodelist):
174 self._logger.info("Registering new cloud node %s", key)
175 if key in self.booted:
176 record = self.booted.pop(key)
178 record = self._new_node(node)
179 self.cloud_nodes.add(record)
180 for arv_rec in self.arvados_nodes.unpaired():
181 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
182 self._pair_nodes(record, arv_rec.arvados_node)
184 for key, record in self.cloud_nodes.orphans.iteritems():
185 if key in self.shutdowns:
187 self.shutdowns[key].stop().get()
188 except pykka.ActorDeadError:
190 del self.shutdowns[key]
192 record.cloud_node = None
194 def update_arvados_nodes(self, nodelist):
195 self._update_poll_time('arvados_nodes')
196 for key, node in self.arvados_nodes.update_from(nodelist):
197 self._logger.info("Registering new Arvados node %s", key)
198 record = _ComputeNodeRecord(arvados_node=node)
199 self.arvados_nodes.add(record)
200 for arv_rec in self.arvados_nodes.unpaired():
201 arv_node = arv_rec.arvados_node
202 for cloud_rec in self.cloud_nodes.unpaired():
203 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
204 self._pair_nodes(cloud_rec, arv_node)
208 return sum(len(nodelist) for nodelist in
209 [self.cloud_nodes, self.booted, self.booting])
211 def _nodes_busy(self):
212 return sum(1 for busy in
213 pykka.get_all(rec.actor.in_state('busy') for rec in
214 self.cloud_nodes.nodes.itervalues())
217 def _nodes_missing(self):
218 return sum(1 for arv_node in
219 pykka.get_all(rec.actor.arvados_node for rec in
220 self.cloud_nodes.nodes.itervalues()
221 if rec.actor.cloud_node.get().id not in self.shutdowns)
222 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
224 def _nodes_wanted(self):
225 up_count = self._nodes_up()
226 under_min = self.min_nodes - up_count
227 over_max = up_count - self.max_nodes
233 up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
234 return len(self.last_wishlist) - up_count
236 def _nodes_excess(self):
237 up_count = self._nodes_up() - len(self.shutdowns)
238 over_min = up_count - self.min_nodes
242 return up_count - self._nodes_busy() - len(self.last_wishlist)
244 def update_server_wishlist(self, wishlist):
245 self._update_poll_time('server_wishlist')
246 self.last_wishlist = wishlist
247 nodes_wanted = self._nodes_wanted()
249 self._later.start_node()
250 elif (nodes_wanted < 0) and self.booting:
251 self._later.stop_booting_node()
253 def _check_poll_freshness(orig_func):
254 """Decorator to inhibit a method when poll information is stale.
256 This decorator checks the timestamps of all the poll information the
257 daemon has received. The decorated method is only called if none
258 of the timestamps are considered stale.
260 @functools.wraps(orig_func)
261 def wrapper(self, *args, **kwargs):
263 if all(now - t < self.poll_stale_after
264 for t in self.last_polls.itervalues()):
265 return orig_func(self, *args, **kwargs)
270 @_check_poll_freshness
271 def start_node(self):
272 nodes_wanted = self._nodes_wanted()
275 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
277 cloud_size = self.last_wishlist[self._nodes_up()]
279 cloud_size = self.min_cloud_size
280 self._logger.info("Want %s more nodes. Booting a %s node.",
281 nodes_wanted, cloud_size.name)
282 new_setup = self._node_setup.start(
283 timer_actor=self._timer,
284 arvados_client=self._new_arvados(),
285 arvados_node=arvados_node,
286 cloud_client=self._new_cloud(),
287 cloud_size=cloud_size).proxy()
288 self.booting[new_setup.actor_ref.actor_urn] = new_setup
289 if arvados_node is not None:
290 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
292 new_setup.subscribe(self._later.node_up)
294 self._later.start_node()
296 def _get_actor_attrs(self, actor, *attr_names):
297 return pykka.get_all([getattr(actor, name) for name in attr_names])
299 def node_up(self, setup_proxy):
300 cloud_node = setup_proxy.cloud_node.get()
301 del self.booting[setup_proxy.actor_ref.actor_urn]
303 record = self.cloud_nodes.get(cloud_node.id)
305 record = self._new_node(cloud_node)
306 self.booted[cloud_node.id] = record
307 self._timer.schedule(time.time() + self.boot_fail_after,
308 self._later.shutdown_unpaired_node, cloud_node.id)
310 @_check_poll_freshness
311 def stop_booting_node(self):
312 nodes_excess = self._nodes_excess()
313 if (nodes_excess < 1) or not self.booting:
315 for key, node in self.booting.iteritems():
316 if node.stop_if_no_cloud_node().get():
317 del self.booting[key]
319 self._later.stop_booting_node()
322 def _begin_node_shutdown(self, node_actor, cancellable):
323 cloud_node_id = node_actor.cloud_node.get().id
324 if cloud_node_id in self.shutdowns:
326 shutdown = self._node_shutdown.start(
327 timer_actor=self._timer, cloud_client=self._new_cloud(),
328 arvados_client=self._new_arvados(),
329 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
330 self.shutdowns[cloud_node_id] = shutdown
331 shutdown.subscribe(self._later.node_finished_shutdown)
333 @_check_poll_freshness
334 def node_can_shutdown(self, node_actor):
335 if self._nodes_excess() > 0:
336 self._begin_node_shutdown(node_actor, cancellable=True)
338 def shutdown_unpaired_node(self, cloud_node_id):
339 for record_dict in [self.cloud_nodes, self.booted]:
340 if cloud_node_id in record_dict:
341 record = record_dict[cloud_node_id]
345 if not record.actor.in_state('idle', 'busy').get():
346 self._begin_node_shutdown(record.actor, cancellable=False)
348 def node_finished_shutdown(self, shutdown_actor):
349 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
351 shutdown_actor.stop()
352 cloud_node_id = cloud_node.id
354 del self.shutdowns[cloud_node_id]
355 elif cloud_node_id in self.booted:
356 self.booted.pop(cloud_node_id).actor.stop()
357 del self.shutdowns[cloud_node_id]
360 self._logger.info("Shutting down after signal.")
361 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
362 setup_stops = {key: node.stop_if_no_cloud_node()
363 for key, node in self.booting.iteritems()}
364 self.booting = {key: self.booting[key]
365 for key in setup_stops if not setup_stops[key].get()}
366 self._later.await_shutdown()
368 def await_shutdown(self):
370 self._timer.schedule(time.time() + 1, self._later.await_shutdown)