3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, min_nodes, max_nodes,
101 poll_stale_after=600,
102 boot_fail_after=1800,
103 node_stale_after=7200,
104 node_setup_class=dispatch.ComputeNodeSetupActor,
105 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106 node_actor_class=dispatch.ComputeNodeMonitorActor):
107 super(NodeManagerDaemonActor, self).__init__()
108 self._node_setup = node_setup_class
109 self._node_shutdown = node_shutdown_class
110 self._node_actor = node_actor_class
111 self._cloud_updater = cloud_update_actor
112 self._timer = timer_actor
113 self._new_arvados = arvados_factory
114 self._new_cloud = cloud_factory
115 self._cloud_driver = self._new_cloud()
116 self._logger = logging.getLogger('arvnodeman.daemon')
117 self._later = self.actor_ref.proxy()
118 self.shutdown_windows = shutdown_windows
119 self.min_nodes = min_nodes
120 self.max_nodes = max_nodes
121 self.poll_stale_after = poll_stale_after
122 self.boot_fail_after = boot_fail_after
123 self.node_stale_after = node_stale_after
125 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
126 poll_actor = locals()[poll_name + '_actor']
127 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
128 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
129 self.last_polls[poll_name] = -self.poll_stale_after
130 self.cloud_nodes = _CloudNodeTracker()
131 self.arvados_nodes = _ArvadosNodeTracker()
132 self.booting = {} # Actor IDs to ComputeNodeSetupActors
133 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
134 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
135 self._logger.debug("Daemon initialized")
137 def _update_poll_time(self, poll_key):
138 self.last_polls[poll_key] = time.time()
140 def _pair_nodes(self, node_record, arvados_node):
141 self._logger.info("Cloud node %s has associated with Arvados node %s",
142 node_record.cloud_node.id, arvados_node['uuid'])
143 self._arvados_nodes_actor.subscribe_to(
144 arvados_node['uuid'], node_record.actor.update_arvados_node)
145 node_record.arvados_node = arvados_node
146 self.arvados_nodes.add(node_record)
148 def _new_node(self, cloud_node):
149 start_time = self._cloud_driver.node_start_time(cloud_node)
150 shutdown_timer = cnode.ShutdownTimer(start_time,
151 self.shutdown_windows)
152 actor = self._node_actor.start(
153 cloud_node=cloud_node,
154 cloud_node_start_time=start_time,
155 shutdown_timer=shutdown_timer,
156 update_actor=self._cloud_updater,
157 timer_actor=self._timer,
159 poll_stale_after=self.poll_stale_after,
160 node_stale_after=self.node_stale_after).proxy()
161 actor.subscribe(self._later.node_can_shutdown)
162 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
163 actor.update_cloud_node)
164 record = _ComputeNodeRecord(actor, cloud_node)
167 def update_cloud_nodes(self, nodelist):
168 self._update_poll_time('cloud_nodes')
169 for key, node in self.cloud_nodes.update_from(nodelist):
170 self._logger.info("Registering new cloud node %s", key)
171 if key in self.booted:
172 record = self.booted.pop(key)
174 record = self._new_node(node)
175 self.cloud_nodes.add(record)
176 for arv_rec in self.arvados_nodes.unpaired():
177 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
178 self._pair_nodes(record, arv_rec.arvados_node)
180 for key, record in self.cloud_nodes.orphans.iteritems():
182 self.shutdowns.pop(key, None)
184 def update_arvados_nodes(self, nodelist):
185 self._update_poll_time('arvados_nodes')
186 for key, node in self.arvados_nodes.update_from(nodelist):
187 self._logger.info("Registering new Arvados node %s", key)
188 record = _ComputeNodeRecord(arvados_node=node)
189 self.arvados_nodes.add(record)
190 for arv_rec in self.arvados_nodes.unpaired():
191 arv_node = arv_rec.arvados_node
192 for cloud_rec in self.cloud_nodes.unpaired():
193 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
194 self._pair_nodes(cloud_rec, arv_node)
198 return sum(len(nodelist) for nodelist in
199 [self.cloud_nodes, self.booted, self.booting])
201 def _nodes_busy(self):
202 return sum(1 for idle in
203 pykka.get_all(rec.actor.in_state('idle') for rec in
204 self.cloud_nodes.nodes.itervalues())
207 def _nodes_wanted(self):
208 up_count = self._nodes_up()
209 over_max = up_count - self.max_nodes
213 up_count -= len(self.shutdowns) + self._nodes_busy()
214 return len(self.last_wishlist) - up_count
216 def _nodes_excess(self):
217 up_count = self._nodes_up() - len(self.shutdowns)
218 over_min = up_count - self.min_nodes
222 return up_count - self._nodes_busy() - len(self.last_wishlist)
224 def update_server_wishlist(self, wishlist):
225 self._update_poll_time('server_wishlist')
226 self.last_wishlist = wishlist
227 nodes_wanted = self._nodes_wanted()
229 self._later.start_node()
230 elif (nodes_wanted < 0) and self.booting:
231 self._later.stop_booting_node()
233 def _check_poll_freshness(orig_func):
234 """Decorator to inhibit a method when poll information is stale.
236 This decorator checks the timestamps of all the poll information the
237 daemon has received. The decorated method is only called if none
238 of the timestamps are considered stale.
240 @functools.wraps(orig_func)
241 def wrapper(self, *args, **kwargs):
243 if all(now - t < self.poll_stale_after
244 for t in self.last_polls.itervalues()):
245 return orig_func(self, *args, **kwargs)
250 @_check_poll_freshness
251 def start_node(self):
252 nodes_wanted = self._nodes_wanted()
255 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
256 cloud_size = self.last_wishlist[nodes_wanted - 1]
257 self._logger.info("Want %s more nodes. Booting a %s node.",
258 nodes_wanted, cloud_size.name)
259 new_setup = self._node_setup.start(
260 timer_actor=self._timer,
261 arvados_client=self._new_arvados(),
262 arvados_node=arvados_node,
263 cloud_client=self._new_cloud(),
264 cloud_size=cloud_size).proxy()
265 self.booting[new_setup.actor_ref.actor_urn] = new_setup
266 if arvados_node is not None:
267 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
269 new_setup.subscribe(self._later.node_up)
271 self._later.start_node()
273 def _get_actor_attrs(self, actor, *attr_names):
274 return pykka.get_all([getattr(actor, name) for name in attr_names])
276 def node_up(self, setup_proxy):
277 cloud_node = setup_proxy.cloud_node.get()
278 del self.booting[setup_proxy.actor_ref.actor_urn]
280 record = self.cloud_nodes.get(cloud_node.id)
282 record = self._new_node(cloud_node)
283 self.booted[cloud_node.id] = record
284 self._timer.schedule(time.time() + self.boot_fail_after,
285 self._later.shutdown_unpaired_node, cloud_node.id)
287 @_check_poll_freshness
288 def stop_booting_node(self):
289 nodes_excess = self._nodes_excess()
290 if (nodes_excess < 1) or not self.booting:
292 for key, node in self.booting.iteritems():
293 node.stop_if_no_cloud_node().get()
294 if not node.actor_ref.is_alive():
295 del self.booting[key]
297 self._later.stop_booting_node()
300 def _begin_node_shutdown(self, node_actor, cancellable):
301 cloud_node_id = node_actor.cloud_node.get().id
302 if cloud_node_id in self.shutdowns:
304 shutdown = self._node_shutdown.start(
305 timer_actor=self._timer, cloud_client=self._new_cloud(),
306 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
307 self.shutdowns[cloud_node_id] = shutdown
308 shutdown.subscribe(self._later.node_finished_shutdown)
310 @_check_poll_freshness
311 def node_can_shutdown(self, node_actor):
312 if self._nodes_excess() > 0:
313 self._begin_node_shutdown(node_actor, cancellable=True)
315 def shutdown_unpaired_node(self, cloud_node_id):
316 for record_dict in [self.cloud_nodes, self.booted]:
317 if cloud_node_id in record_dict:
318 record = record_dict[cloud_node_id]
322 if record.arvados_node is None:
323 self._begin_node_shutdown(record.actor, cancellable=False)
325 def node_finished_shutdown(self, shutdown_actor):
326 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
328 shutdown_actor.stop()
329 cloud_node_id = cloud_node.id
331 del self.shutdowns[cloud_node_id]
332 elif cloud_node_id in self.booted:
333 self.booted.pop(cloud_node_id).actor.stop()
334 del self.shutdowns[cloud_node_id]
337 self._logger.info("Shutting down after signal.")
338 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
339 for bootnode in self.booting.itervalues():
340 bootnode.stop_if_no_cloud_node()
341 self._later.await_shutdown()
343 def await_shutdown(self):
344 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
345 self._timer.schedule(time.time() + 1, self._later.await_shutdown)