3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, min_size, min_nodes, max_nodes,
101 poll_stale_after=600,
102 boot_fail_after=1800,
103 node_stale_after=7200,
104 node_setup_class=dispatch.ComputeNodeSetupActor,
105 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106 node_actor_class=dispatch.ComputeNodeMonitorActor):
107 super(NodeManagerDaemonActor, self).__init__()
108 self._node_setup = node_setup_class
109 self._node_shutdown = node_shutdown_class
110 self._node_actor = node_actor_class
111 self._cloud_updater = cloud_update_actor
112 self._timer = timer_actor
113 self._new_arvados = arvados_factory
114 self._new_cloud = cloud_factory
115 self._cloud_driver = self._new_cloud()
116 self._logger = logging.getLogger('arvnodeman.daemon')
117 self._later = self.actor_ref.proxy()
118 self.shutdown_windows = shutdown_windows
119 self.min_cloud_size = min_size
120 self.min_nodes = min_nodes
121 self.max_nodes = max_nodes
122 self.poll_stale_after = poll_stale_after
123 self.boot_fail_after = boot_fail_after
124 self.node_stale_after = node_stale_after
126 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127 poll_actor = locals()[poll_name + '_actor']
128 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130 self.last_polls[poll_name] = -self.poll_stale_after
131 self.cloud_nodes = _CloudNodeTracker()
132 self.arvados_nodes = _ArvadosNodeTracker()
133 self.booting = {} # Actor IDs to ComputeNodeSetupActors
134 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
135 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
136 self._logger.debug("Daemon initialized")
138 def _update_poll_time(self, poll_key):
139 self.last_polls[poll_key] = time.time()
141 def _pair_nodes(self, node_record, arvados_node):
142 self._logger.info("Cloud node %s has associated with Arvados node %s",
143 node_record.cloud_node.id, arvados_node['uuid'])
144 self._arvados_nodes_actor.subscribe_to(
145 arvados_node['uuid'], node_record.actor.update_arvados_node)
146 node_record.arvados_node = arvados_node
147 self.arvados_nodes.add(node_record)
149 def _new_node(self, cloud_node):
150 start_time = self._cloud_driver.node_start_time(cloud_node)
151 shutdown_timer = cnode.ShutdownTimer(start_time,
152 self.shutdown_windows)
153 actor = self._node_actor.start(
154 cloud_node=cloud_node,
155 cloud_node_start_time=start_time,
156 shutdown_timer=shutdown_timer,
157 cloud_fqdn_func=self._cloud_driver.node_fqdn,
158 update_actor=self._cloud_updater,
159 timer_actor=self._timer,
161 poll_stale_after=self.poll_stale_after,
162 node_stale_after=self.node_stale_after,
163 cloud_client=self._cloud_driver,
164 boot_fail_after=self.boot_fail_after).proxy()
165 actor.subscribe(self._later.node_can_shutdown)
166 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
167 actor.update_cloud_node)
168 record = _ComputeNodeRecord(actor, cloud_node)
171 def update_cloud_nodes(self, nodelist):
172 self._update_poll_time('cloud_nodes')
173 for key, node in self.cloud_nodes.update_from(nodelist):
174 self._logger.info("Registering new cloud node %s", key)
175 if key in self.booted:
176 record = self.booted.pop(key)
178 record = self._new_node(node)
179 self.cloud_nodes.add(record)
180 for arv_rec in self.arvados_nodes.unpaired():
181 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
182 self._pair_nodes(record, arv_rec.arvados_node)
184 for key, record in self.cloud_nodes.orphans.iteritems():
186 record.cloud_node = None
187 self.shutdowns.pop(key, None)
189 def update_arvados_nodes(self, nodelist):
190 self._update_poll_time('arvados_nodes')
191 for key, node in self.arvados_nodes.update_from(nodelist):
192 self._logger.info("Registering new Arvados node %s", key)
193 record = _ComputeNodeRecord(arvados_node=node)
194 self.arvados_nodes.add(record)
195 for arv_rec in self.arvados_nodes.unpaired():
196 arv_node = arv_rec.arvados_node
197 for cloud_rec in self.cloud_nodes.unpaired():
198 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
199 self._pair_nodes(cloud_rec, arv_node)
203 return sum(len(nodelist) for nodelist in
204 [self.cloud_nodes, self.booted, self.booting])
206 def _nodes_busy(self):
207 return sum(1 for busy in
208 pykka.get_all(rec.actor.in_state('busy') for rec in
209 self.cloud_nodes.nodes.itervalues())
212 def _nodes_missing(self):
213 return sum(1 for arv_node in
214 pykka.get_all(rec.actor.arvados_node for rec in
215 self.cloud_nodes.nodes.itervalues())
216 if arv_node and arv_node.get("status") == "missing")
218 def _nodes_wanted(self):
219 up_count = self._nodes_up()
220 under_min = self.min_nodes - up_count
221 over_max = up_count - self.max_nodes
227 up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
228 return len(self.last_wishlist) - up_count
230 def _nodes_excess(self):
231 up_count = self._nodes_up() - len(self.shutdowns) - self._nodes_missing()
232 over_min = up_count - self.min_nodes
236 return up_count - self._nodes_busy() - len(self.last_wishlist)
238 def update_server_wishlist(self, wishlist):
239 self._update_poll_time('server_wishlist')
240 self.last_wishlist = wishlist
241 nodes_wanted = self._nodes_wanted()
243 self._later.start_node()
244 elif (nodes_wanted < 0) and self.booting:
245 self._later.stop_booting_node()
247 def _check_poll_freshness(orig_func):
248 """Decorator to inhibit a method when poll information is stale.
250 This decorator checks the timestamps of all the poll information the
251 daemon has received. The decorated method is only called if none
252 of the timestamps are considered stale.
254 @functools.wraps(orig_func)
255 def wrapper(self, *args, **kwargs):
257 if all(now - t < self.poll_stale_after
258 for t in self.last_polls.itervalues()):
259 return orig_func(self, *args, **kwargs)
264 @_check_poll_freshness
265 def start_node(self):
266 nodes_wanted = self._nodes_wanted()
269 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
271 cloud_size = self.last_wishlist[self._nodes_up()]
273 cloud_size = self.min_cloud_size
274 self._logger.info("Want %s more nodes. Booting a %s node.",
275 nodes_wanted, cloud_size.name)
276 new_setup = self._node_setup.start(
277 timer_actor=self._timer,
278 arvados_client=self._new_arvados(),
279 arvados_node=arvados_node,
280 cloud_client=self._new_cloud(),
281 cloud_size=cloud_size).proxy()
282 self.booting[new_setup.actor_ref.actor_urn] = new_setup
283 if arvados_node is not None:
284 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
286 new_setup.subscribe(self._later.node_up)
288 self._later.start_node()
290 def _get_actor_attrs(self, actor, *attr_names):
291 return pykka.get_all([getattr(actor, name) for name in attr_names])
293 def node_up(self, setup_proxy):
294 cloud_node = setup_proxy.cloud_node.get()
295 del self.booting[setup_proxy.actor_ref.actor_urn]
297 record = self.cloud_nodes.get(cloud_node.id)
299 record = self._new_node(cloud_node)
300 self.booted[cloud_node.id] = record
301 self._timer.schedule(time.time() + self.boot_fail_after,
302 self._later.shutdown_unpaired_node, cloud_node.id)
304 @_check_poll_freshness
305 def stop_booting_node(self):
306 nodes_excess = self._nodes_excess()
307 if (nodes_excess < 1) or not self.booting:
309 for key, node in self.booting.iteritems():
310 if node.stop_if_no_cloud_node().get():
311 del self.booting[key]
313 self._later.stop_booting_node()
316 def _begin_node_shutdown(self, node_actor, cancellable):
317 cloud_node_id = node_actor.cloud_node.get().id
318 if cloud_node_id in self.shutdowns:
320 shutdown = self._node_shutdown.start(
321 timer_actor=self._timer, cloud_client=self._new_cloud(),
322 arvados_client=self._new_arvados(),
323 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
324 self.shutdowns[cloud_node_id] = shutdown
325 shutdown.subscribe(self._later.node_finished_shutdown)
327 @_check_poll_freshness
328 def node_can_shutdown(self, node_actor):
329 if self._nodes_excess() > 0:
330 self._begin_node_shutdown(node_actor, cancellable=True)
332 def shutdown_unpaired_node(self, cloud_node_id):
333 for record_dict in [self.cloud_nodes, self.booted]:
334 if cloud_node_id in record_dict:
335 record = record_dict[cloud_node_id]
339 if not record.actor.in_state('idle', 'busy').get():
340 self._begin_node_shutdown(record.actor, cancellable=False)
342 def node_finished_shutdown(self, shutdown_actor):
343 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
345 shutdown_actor.stop()
346 cloud_node_id = cloud_node.id
348 del self.shutdowns[cloud_node_id]
349 elif cloud_node_id in self.booted:
350 self.booted.pop(cloud_node_id).actor.stop()
351 del self.shutdowns[cloud_node_id]
354 self._logger.info("Shutting down after signal.")
355 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
356 setup_stops = {key: node.stop_if_no_cloud_node()
357 for key, node in self.booting.iteritems()}
358 self.booting = {key: self.booting[key]
359 for key in setup_stops if not setup_stops[key].get()}
360 self._later.await_shutdown()
362 def await_shutdown(self):
364 self._timer.schedule(time.time() + 1, self._later.await_shutdown)