3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, min_size, min_nodes, max_nodes,
101 poll_stale_after=600,
102 boot_fail_after=1800,
103 node_stale_after=7200,
104 node_setup_class=dispatch.ComputeNodeSetupActor,
105 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106 node_actor_class=dispatch.ComputeNodeMonitorActor):
107 super(NodeManagerDaemonActor, self).__init__()
108 self._node_setup = node_setup_class
109 self._node_shutdown = node_shutdown_class
110 self._node_actor = node_actor_class
111 self._cloud_updater = cloud_update_actor
112 self._timer = timer_actor
113 self._new_arvados = arvados_factory
114 self._new_cloud = cloud_factory
115 self._cloud_driver = self._new_cloud()
116 self._logger = logging.getLogger('arvnodeman.daemon')
117 self._later = self.actor_ref.proxy()
118 self.shutdown_windows = shutdown_windows
119 self.min_cloud_size = min_size
120 self.min_nodes = min_nodes
121 self.max_nodes = max_nodes
122 self.poll_stale_after = poll_stale_after
123 self.boot_fail_after = boot_fail_after
124 self.node_stale_after = node_stale_after
126 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127 poll_actor = locals()[poll_name + '_actor']
128 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130 self.last_polls[poll_name] = -self.poll_stale_after
131 self.cloud_nodes = _CloudNodeTracker()
132 self.arvados_nodes = _ArvadosNodeTracker()
133 self.booting = {} # Actor IDs to ComputeNodeSetupActors
134 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
135 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
136 self._logger.debug("Daemon initialized")
138 def _update_poll_time(self, poll_key):
139 self.last_polls[poll_key] = time.time()
141 def _pair_nodes(self, node_record, arvados_node):
142 self._logger.info("Cloud node %s has associated with Arvados node %s",
143 node_record.cloud_node.id, arvados_node['uuid'])
144 self._arvados_nodes_actor.subscribe_to(
145 arvados_node['uuid'], node_record.actor.update_arvados_node)
146 node_record.arvados_node = arvados_node
147 self.arvados_nodes.add(node_record)
149 def _new_node(self, cloud_node):
150 start_time = self._cloud_driver.node_start_time(cloud_node)
151 shutdown_timer = cnode.ShutdownTimer(start_time,
152 self.shutdown_windows)
153 actor = self._node_actor.start(
154 cloud_node=cloud_node,
155 cloud_node_start_time=start_time,
156 shutdown_timer=shutdown_timer,
157 cloud_fqdn_func=self._cloud_driver.node_fqdn,
158 update_actor=self._cloud_updater,
159 timer_actor=self._timer,
161 poll_stale_after=self.poll_stale_after,
162 node_stale_after=self.node_stale_after).proxy()
163 actor.subscribe(self._later.node_can_shutdown)
164 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
165 actor.update_cloud_node)
166 record = _ComputeNodeRecord(actor, cloud_node)
169 def update_cloud_nodes(self, nodelist):
170 self._update_poll_time('cloud_nodes')
171 for key, node in self.cloud_nodes.update_from(nodelist):
172 self._logger.info("Registering new cloud node %s", key)
173 if key in self.booted:
174 record = self.booted.pop(key)
176 record = self._new_node(node)
177 self.cloud_nodes.add(record)
178 for arv_rec in self.arvados_nodes.unpaired():
179 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
180 self._pair_nodes(record, arv_rec.arvados_node)
182 for key, record in self.cloud_nodes.orphans.iteritems():
184 record.cloud_node = None
185 self.shutdowns.pop(key, None)
187 def update_arvados_nodes(self, nodelist):
188 self._update_poll_time('arvados_nodes')
189 for key, node in self.arvados_nodes.update_from(nodelist):
190 self._logger.info("Registering new Arvados node %s", key)
191 record = _ComputeNodeRecord(arvados_node=node)
192 self.arvados_nodes.add(record)
193 for arv_rec in self.arvados_nodes.unpaired():
194 arv_node = arv_rec.arvados_node
195 for cloud_rec in self.cloud_nodes.unpaired():
196 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
197 self._pair_nodes(cloud_rec, arv_node)
201 return sum(len(nodelist) for nodelist in
202 [self.cloud_nodes, self.booted, self.booting])
204 def _nodes_busy(self):
205 return sum(1 for busy in
206 pykka.get_all(rec.actor.in_state('busy') for rec in
207 self.cloud_nodes.nodes.itervalues())
210 def _nodes_wanted(self):
211 up_count = self._nodes_up()
212 under_min = self.min_nodes - up_count
213 over_max = up_count - self.max_nodes
219 up_count -= len(self.shutdowns) + self._nodes_busy()
220 return len(self.last_wishlist) - up_count
222 def _nodes_excess(self):
223 up_count = self._nodes_up() - len(self.shutdowns)
224 over_min = up_count - self.min_nodes
228 return up_count - self._nodes_busy() - len(self.last_wishlist)
230 def update_server_wishlist(self, wishlist):
231 self._update_poll_time('server_wishlist')
232 self.last_wishlist = wishlist
233 nodes_wanted = self._nodes_wanted()
235 self._later.start_node()
236 elif (nodes_wanted < 0) and self.booting:
237 self._later.stop_booting_node()
239 def _check_poll_freshness(orig_func):
240 """Decorator to inhibit a method when poll information is stale.
242 This decorator checks the timestamps of all the poll information the
243 daemon has received. The decorated method is only called if none
244 of the timestamps are considered stale.
246 @functools.wraps(orig_func)
247 def wrapper(self, *args, **kwargs):
249 if all(now - t < self.poll_stale_after
250 for t in self.last_polls.itervalues()):
251 return orig_func(self, *args, **kwargs)
256 @_check_poll_freshness
257 def start_node(self):
258 nodes_wanted = self._nodes_wanted()
261 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
263 cloud_size = self.last_wishlist[self._nodes_up()]
265 cloud_size = self.min_cloud_size
266 self._logger.info("Want %s more nodes. Booting a %s node.",
267 nodes_wanted, cloud_size.name)
268 new_setup = self._node_setup.start(
269 timer_actor=self._timer,
270 arvados_client=self._new_arvados(),
271 arvados_node=arvados_node,
272 cloud_client=self._new_cloud(),
273 cloud_size=cloud_size).proxy()
274 self.booting[new_setup.actor_ref.actor_urn] = new_setup
275 if arvados_node is not None:
276 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
278 new_setup.subscribe(self._later.node_up)
280 self._later.start_node()
282 def _get_actor_attrs(self, actor, *attr_names):
283 return pykka.get_all([getattr(actor, name) for name in attr_names])
285 def node_up(self, setup_proxy):
286 cloud_node = setup_proxy.cloud_node.get()
287 del self.booting[setup_proxy.actor_ref.actor_urn]
289 record = self.cloud_nodes.get(cloud_node.id)
291 record = self._new_node(cloud_node)
292 self.booted[cloud_node.id] = record
293 self._timer.schedule(time.time() + self.boot_fail_after,
294 self._later.shutdown_unpaired_node, cloud_node.id)
296 @_check_poll_freshness
297 def stop_booting_node(self):
298 nodes_excess = self._nodes_excess()
299 if (nodes_excess < 1) or not self.booting:
301 for key, node in self.booting.iteritems():
302 if node.stop_if_no_cloud_node().get():
303 del self.booting[key]
305 self._later.stop_booting_node()
308 def _begin_node_shutdown(self, node_actor, cancellable):
309 cloud_node_id = node_actor.cloud_node.get().id
310 if cloud_node_id in self.shutdowns:
312 shutdown = self._node_shutdown.start(
313 timer_actor=self._timer, cloud_client=self._new_cloud(),
314 arvados_client=self._new_arvados(),
315 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
316 self.shutdowns[cloud_node_id] = shutdown
317 shutdown.subscribe(self._later.node_finished_shutdown)
319 @_check_poll_freshness
320 def node_can_shutdown(self, node_actor):
321 if self._nodes_excess() > 0:
322 self._begin_node_shutdown(node_actor, cancellable=True)
324 def shutdown_unpaired_node(self, cloud_node_id):
325 for record_dict in [self.cloud_nodes, self.booted]:
326 if cloud_node_id in record_dict:
327 record = record_dict[cloud_node_id]
331 if not record.actor.in_state('idle', 'busy').get():
332 self._begin_node_shutdown(record.actor, cancellable=False)
334 def node_finished_shutdown(self, shutdown_actor):
335 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
337 shutdown_actor.stop()
338 cloud_node_id = cloud_node.id
340 del self.shutdowns[cloud_node_id]
341 elif cloud_node_id in self.booted:
342 self.booted.pop(cloud_node_id).actor.stop()
343 del self.shutdowns[cloud_node_id]
346 self._logger.info("Shutting down after signal.")
347 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
348 setup_stops = {key: node.stop_if_no_cloud_node()
349 for key, node in self.booting.iteritems()}
350 self.booting = {key: self.booting[key]
351 for key in setup_stops if not setup_stops[key].get()}
352 self._later.await_shutdown()
354 def await_shutdown(self):
356 self._timer.schedule(time.time() + 1, self._later.await_shutdown)