3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 def __getitem__(self, key):
30 return self.nodes[key]
33 return len(self.nodes)
35 def get(self, key, default=None):
36 return self.nodes.get(key, default)
38 def record_key(self, record):
39 return self.item_key(getattr(record, self.RECORD_ATTR))
41 def add(self, record):
42 self.nodes[self.record_key(record)] = record
44 def update_record(self, key, item):
45 setattr(self.nodes[key], self.RECORD_ATTR, item)
47 def update_from(self, response):
48 unseen = set(self.nodes.iterkeys())
50 key = self.item_key(item)
53 self.update_record(key, item)
56 self.orphans = {key: self.nodes.pop(key) for key in unseen}
59 return (record for record in self.nodes.itervalues()
60 if getattr(record, self.PAIR_ATTR) is None)
63 class _CloudNodeTracker(_BaseNodeTracker):
64 RECORD_ATTR = 'cloud_node'
65 PAIR_ATTR = 'arvados_node'
66 item_key = staticmethod(lambda cloud_node: cloud_node.id)
69 class _ArvadosNodeTracker(_BaseNodeTracker):
70 RECORD_ATTR = 'arvados_node'
71 PAIR_ATTR = 'cloud_node'
72 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
74 def find_stale_node(self, stale_time):
75 for record in self.nodes.itervalues():
76 node = record.arvados_node
77 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
79 not cnode.timestamp_fresh(record.assignment_time,
85 class NodeManagerDaemonActor(actor_class):
86 """Node Manager daemon.
88 This actor subscribes to all information polls about cloud nodes,
89 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
90 for every cloud node, subscribing them to poll updates
91 appropriately. It creates and destroys cloud nodes based on job queue
92 demand, and stops the corresponding ComputeNode actors when their work
95 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
96 cloud_nodes_actor, cloud_update_actor, timer_actor,
97 arvados_factory, cloud_factory,
98 shutdown_windows, min_nodes, max_nodes,
99 poll_stale_after=600, node_stale_after=7200,
100 node_setup_class=dispatch.ComputeNodeSetupActor,
101 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
102 node_actor_class=dispatch.ComputeNodeMonitorActor):
103 super(NodeManagerDaemonActor, self).__init__()
104 self._node_setup = node_setup_class
105 self._node_shutdown = node_shutdown_class
106 self._node_actor = node_actor_class
107 self._cloud_updater = cloud_update_actor
108 self._timer = timer_actor
109 self._new_arvados = arvados_factory
110 self._new_cloud = cloud_factory
111 self._cloud_driver = self._new_cloud()
112 self._logger = logging.getLogger('arvnodeman.daemon')
113 self._later = self.actor_ref.proxy()
114 self.shutdown_windows = shutdown_windows
115 self.min_nodes = min_nodes
116 self.max_nodes = max_nodes
117 self.poll_stale_after = poll_stale_after
118 self.node_stale_after = node_stale_after
120 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
121 poll_actor = locals()[poll_name + '_actor']
122 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
123 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
124 self.last_polls[poll_name] = -self.poll_stale_after
125 self.cloud_nodes = _CloudNodeTracker()
126 self.arvados_nodes = _ArvadosNodeTracker()
127 self.booting = {} # Actor IDs to ComputeNodeSetupActors
128 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
129 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
130 self._logger.debug("Daemon initialized")
132 def _update_poll_time(self, poll_key):
133 self.last_polls[poll_key] = time.time()
135 def _pair_nodes(self, node_record, arvados_node):
136 self._logger.info("Cloud node %s has associated with Arvados node %s",
137 node_record.cloud_node.id, arvados_node['uuid'])
138 self._arvados_nodes_actor.subscribe_to(
139 arvados_node['uuid'], node_record.actor.update_arvados_node)
140 node_record.arvados_node = arvados_node
141 self.arvados_nodes.add(node_record)
143 def _new_node(self, cloud_node):
144 start_time = self._cloud_driver.node_start_time(cloud_node)
145 shutdown_timer = cnode.ShutdownTimer(start_time,
146 self.shutdown_windows)
147 actor = self._node_actor.start(
148 cloud_node=cloud_node,
149 cloud_node_start_time=start_time,
150 shutdown_timer=shutdown_timer,
151 update_actor=self._cloud_updater,
152 timer_actor=self._timer,
154 poll_stale_after=self.poll_stale_after,
155 node_stale_after=self.node_stale_after).proxy()
156 actor.subscribe(self._later.node_can_shutdown)
157 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
158 actor.update_cloud_node)
159 record = _ComputeNodeRecord(actor, cloud_node)
162 def update_cloud_nodes(self, nodelist):
163 self._update_poll_time('cloud_nodes')
164 for key, node in self.cloud_nodes.update_from(nodelist):
165 self._logger.info("Registering new cloud node %s", key)
166 if key in self.booted:
167 record = self.booted.pop(key)
169 record = self._new_node(node)
170 self.cloud_nodes.add(record)
171 for arv_rec in self.arvados_nodes.unpaired():
172 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
173 self._pair_nodes(record, arv_rec.arvados_node)
175 for key, record in self.cloud_nodes.orphans.iteritems():
177 self.shutdowns.pop(key, None)
179 def update_arvados_nodes(self, nodelist):
180 self._update_poll_time('arvados_nodes')
181 for key, node in self.arvados_nodes.update_from(nodelist):
182 self._logger.info("Registering new Arvados node %s", key)
183 record = _ComputeNodeRecord(arvados_node=node)
184 self.arvados_nodes.add(record)
185 for arv_rec in self.arvados_nodes.unpaired():
186 arv_node = arv_rec.arvados_node
187 for cloud_rec in self.cloud_nodes.unpaired():
188 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
189 self._pair_nodes(cloud_rec, arv_node)
193 up = sum(len(nodelist) for nodelist in
194 [self.cloud_nodes, self.booted, self.booting])
195 return up - len(self.shutdowns)
197 def _nodes_busy(self):
198 return sum(1 for idle in
199 pykka.get_all(rec.actor.in_state('idle') for rec in
200 self.cloud_nodes.nodes.itervalues())
203 def _nodes_wanted(self):
204 return min(len(self.last_wishlist) + self._nodes_busy(),
205 self.max_nodes) - self._nodes_up()
207 def _nodes_excess(self):
208 needed_nodes = self._nodes_busy() + len(self.last_wishlist)
209 return (self._nodes_up() - max(self.min_nodes, needed_nodes))
211 def update_server_wishlist(self, wishlist):
212 self._update_poll_time('server_wishlist')
213 self.last_wishlist = wishlist
214 nodes_wanted = self._nodes_wanted()
216 self._later.start_node()
217 elif (nodes_wanted < 0) and self.booting:
218 self._later.stop_booting_node()
220 def _check_poll_freshness(orig_func):
221 """Decorator to inhibit a method when poll information is stale.
223 This decorator checks the timestamps of all the poll information the
224 daemon has received. The decorated method is only called if none
225 of the timestamps are considered stale.
227 @functools.wraps(orig_func)
228 def wrapper(self, *args, **kwargs):
230 if all(now - t < self.poll_stale_after
231 for t in self.last_polls.itervalues()):
232 return orig_func(self, *args, **kwargs)
237 @_check_poll_freshness
238 def start_node(self):
239 nodes_wanted = self._nodes_wanted()
242 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
243 cloud_size = self.last_wishlist[nodes_wanted - 1]
244 self._logger.info("Want %s more nodes. Booting a %s node.",
245 nodes_wanted, cloud_size.name)
246 new_setup = self._node_setup.start(
247 timer_actor=self._timer,
248 arvados_client=self._new_arvados(),
249 arvados_node=arvados_node,
250 cloud_client=self._new_cloud(),
251 cloud_size=cloud_size).proxy()
252 self.booting[new_setup.actor_ref.actor_urn] = new_setup
253 if arvados_node is not None:
254 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
256 new_setup.subscribe(self._later.node_up)
258 self._later.start_node()
260 def _actor_nodes(self, node_actor):
261 return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
263 def node_up(self, setup_proxy):
264 cloud_node, arvados_node = self._actor_nodes(setup_proxy)
265 del self.booting[setup_proxy.actor_ref.actor_urn]
267 record = self.cloud_nodes.get(cloud_node.id)
269 record = self._new_node(cloud_node)
270 self.booted[cloud_node.id] = record
271 self._pair_nodes(record, arvados_node)
273 @_check_poll_freshness
274 def stop_booting_node(self):
275 nodes_excess = self._nodes_excess()
276 if (nodes_excess < 1) or not self.booting:
278 for key, node in self.booting.iteritems():
279 node.stop_if_no_cloud_node().get()
280 if not node.actor_ref.is_alive():
281 del self.booting[key]
283 self._later.stop_booting_node()
286 @_check_poll_freshness
287 def node_can_shutdown(self, node_actor):
288 if self._nodes_excess() < 1:
290 cloud_node, arvados_node = self._actor_nodes(node_actor)
291 if cloud_node.id in self.shutdowns:
293 shutdown = self._node_shutdown.start(timer_actor=self._timer,
294 cloud_client=self._new_cloud(),
295 cloud_node=cloud_node).proxy()
296 self.shutdowns[cloud_node.id] = shutdown
297 shutdown.subscribe(self._later.node_finished_shutdown)
299 def node_finished_shutdown(self, shutdown_actor):
300 cloud_node_id = shutdown_actor.cloud_node.get().id
301 shutdown_actor.stop()
302 if cloud_node_id in self.booted:
303 self.booted.pop(cloud_node_id).actor.stop()
304 del self.shutdowns[cloud_node_id]
307 self._logger.info("Shutting down after signal.")
308 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
309 for bootnode in self.booting.itervalues():
310 bootnode.stop_if_no_cloud_node()
311 self._later.await_shutdown()
313 def await_shutdown(self):
314 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
315 self._timer.schedule(time.time() + 1, self._later.await_shutdown)