3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .config import actor_class
14 class _ComputeNodeRecord(object):
15 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16 assignment_time=float('-inf')):
18 self.cloud_node = cloud_node
19 self.arvados_node = arvados_node
20 self.assignment_time = assignment_time
23 class _BaseNodeTracker(object):
28 def __getitem__(self, key):
29 return self.nodes[key]
32 return len(self.nodes)
34 def get(self, key, default=None):
35 return self.nodes.get(key, default)
37 def record_key(self, record):
38 return self.item_key(getattr(record, self.RECORD_ATTR))
40 def add(self, record):
41 self.nodes[self.record_key(record)] = record
43 def update_record(self, key, item):
44 setattr(self.nodes[key], self.RECORD_ATTR, item)
46 def update_from(self, response):
47 unseen = set(self.nodes.iterkeys())
49 key = self.item_key(item)
52 self.update_record(key, item)
55 self.orphans = {key: self.nodes.pop(key) for key in unseen}
58 return (record for record in self.nodes.itervalues()
59 if getattr(record, self.PAIR_ATTR) is None)
62 class _CloudNodeTracker(_BaseNodeTracker):
63 RECORD_ATTR = 'cloud_node'
64 PAIR_ATTR = 'arvados_node'
65 item_key = staticmethod(lambda cloud_node: cloud_node.id)
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69 RECORD_ATTR = 'arvados_node'
70 PAIR_ATTR = 'cloud_node'
71 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
73 def find_stale_node(self, stale_time):
74 for record in self.nodes.itervalues():
75 node = record.arvados_node
76 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
78 not cnode.timestamp_fresh(record.assignment_time,
84 class NodeManagerDaemonActor(actor_class):
85 """Node Manager daemon.
87 This actor subscribes to all information polls about cloud nodes,
88 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
89 for every cloud node, subscribing them to poll updates
90 appropriately. It creates and destroys cloud nodes based on job queue
91 demand, and stops the corresponding ComputeNode actors when their work
94 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95 cloud_nodes_actor, cloud_update_actor, timer_actor,
96 arvados_factory, cloud_factory,
97 shutdown_windows, max_nodes,
98 poll_stale_after=600, node_stale_after=7200,
99 node_setup_class=cnode.ComputeNodeSetupActor,
100 node_shutdown_class=cnode.ComputeNodeShutdownActor,
101 node_actor_class=cnode.ComputeNodeMonitorActor):
102 super(NodeManagerDaemonActor, self).__init__()
103 self._node_setup = node_setup_class
104 self._node_shutdown = node_shutdown_class
105 self._node_actor = node_actor_class
106 self._cloud_updater = cloud_update_actor
107 self._timer = timer_actor
108 self._new_arvados = arvados_factory
109 self._new_cloud = cloud_factory
110 self._cloud_driver = self._new_cloud()
111 self._logger = logging.getLogger('arvnodeman.daemon')
112 self._later = self.actor_ref.proxy()
113 self.shutdown_windows = shutdown_windows
114 self.max_nodes = max_nodes
115 self.poll_stale_after = poll_stale_after
116 self.node_stale_after = node_stale_after
118 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
119 poll_actor = locals()[poll_name + '_actor']
120 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
121 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
122 self.last_polls[poll_name] = -self.poll_stale_after
123 self.cloud_nodes = _CloudNodeTracker()
124 self.arvados_nodes = _ArvadosNodeTracker()
125 self.booting = {} # Actor IDs to ComputeNodeSetupActors
126 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
127 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
128 self._logger.debug("Daemon initialized")
130 def _update_poll_time(self, poll_key):
131 self.last_polls[poll_key] = time.time()
133 def _pair_nodes(self, node_record, arvados_node):
134 self._logger.info("Cloud node %s has associated with Arvados node %s",
135 node_record.cloud_node.id, arvados_node['uuid'])
136 self._arvados_nodes_actor.subscribe_to(
137 arvados_node['uuid'], node_record.actor.update_arvados_node)
138 node_record.arvados_node = arvados_node
139 self.arvados_nodes.add(node_record)
141 def _new_node(self, cloud_node):
142 start_time = self._cloud_driver.node_start_time(cloud_node)
143 shutdown_timer = cnode.ShutdownTimer(start_time,
144 self.shutdown_windows)
145 actor = self._node_actor.start(
146 cloud_node=cloud_node,
147 cloud_node_start_time=start_time,
148 shutdown_timer=shutdown_timer,
149 update_actor=self._cloud_updater,
150 timer_actor=self._timer,
152 poll_stale_after=self.poll_stale_after,
153 node_stale_after=self.node_stale_after).proxy()
154 actor.subscribe(self._later.node_can_shutdown)
155 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
156 actor.update_cloud_node)
157 record = _ComputeNodeRecord(actor, cloud_node)
160 def update_cloud_nodes(self, nodelist):
161 self._update_poll_time('cloud_nodes')
162 for key, node in self.cloud_nodes.update_from(nodelist):
163 self._logger.info("Registering new cloud node %s", key)
164 if key in self.booted:
165 record = self.booted.pop(key)
167 record = self._new_node(node)
168 self.cloud_nodes.add(record)
169 for arv_rec in self.arvados_nodes.unpaired():
170 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
171 self._pair_nodes(record, arv_rec.arvados_node)
173 for key, record in self.cloud_nodes.orphans.iteritems():
175 self.shutdowns.pop(key, None)
177 def update_arvados_nodes(self, nodelist):
178 self._update_poll_time('arvados_nodes')
179 for key, node in self.arvados_nodes.update_from(nodelist):
180 self._logger.info("Registering new Arvados node %s", key)
181 record = _ComputeNodeRecord(arvados_node=node)
182 self.arvados_nodes.add(record)
183 for arv_rec in self.arvados_nodes.unpaired():
184 arv_node = arv_rec.arvados_node
185 for cloud_rec in self.cloud_nodes.unpaired():
186 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
187 self._pair_nodes(cloud_rec, arv_node)
191 up = sum(len(nodelist) for nodelist in
192 [self.cloud_nodes, self.booted, self.booting])
193 return up - len(self.shutdowns)
195 def _nodes_busy(self):
196 return sum(1 for idle in
197 pykka.get_all(rec.actor.in_state('idle') for rec in
198 self.cloud_nodes.nodes.itervalues())
201 def _nodes_wanted(self):
202 return min(len(self.last_wishlist) + self._nodes_busy(),
203 self.max_nodes) - self._nodes_up()
205 def _nodes_excess(self):
206 return self._nodes_up() - self._nodes_busy() - len(self.last_wishlist)
208 def update_server_wishlist(self, wishlist):
209 self._update_poll_time('server_wishlist')
210 self.last_wishlist = wishlist
211 nodes_wanted = self._nodes_wanted()
213 self._later.start_node()
214 elif (nodes_wanted < 0) and self.booting:
215 self._later.stop_booting_node()
217 def _check_poll_freshness(orig_func):
218 """Decorator to inhibit a method when poll information is stale.
220 This decorator checks the timestamps of all the poll information the
221 daemon has received. The decorated method is only called if none
222 of the timestamps are considered stale.
224 @functools.wraps(orig_func)
225 def wrapper(self, *args, **kwargs):
227 if all(now - t < self.poll_stale_after
228 for t in self.last_polls.itervalues()):
229 return orig_func(self, *args, **kwargs)
234 @_check_poll_freshness
235 def start_node(self):
236 nodes_wanted = self._nodes_wanted()
239 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
240 cloud_size = self.last_wishlist[nodes_wanted - 1]
241 self._logger.info("Want %s more nodes. Booting a %s node.",
242 nodes_wanted, cloud_size.name)
243 new_setup = self._node_setup.start(
244 timer_actor=self._timer,
245 arvados_client=self._new_arvados(),
246 arvados_node=arvados_node,
247 cloud_client=self._new_cloud(),
248 cloud_size=cloud_size).proxy()
249 self.booting[new_setup.actor_ref.actor_urn] = new_setup
250 if arvados_node is not None:
251 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
253 new_setup.subscribe(self._later.node_up)
255 self._later.start_node()
257 def _actor_nodes(self, node_actor):
258 return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
260 def node_up(self, setup_proxy):
261 cloud_node, arvados_node = self._actor_nodes(setup_proxy)
262 del self.booting[setup_proxy.actor_ref.actor_urn]
264 record = self.cloud_nodes.get(cloud_node.id)
266 record = self._new_node(cloud_node)
267 self.booted[cloud_node.id] = record
268 self._pair_nodes(record, arvados_node)
270 @_check_poll_freshness
271 def stop_booting_node(self):
272 nodes_excess = self._nodes_excess()
273 if (nodes_excess < 1) or not self.booting:
275 for key, node in self.booting.iteritems():
276 node.stop_if_no_cloud_node().get()
277 if not node.actor_ref.is_alive():
278 del self.booting[key]
280 self._later.stop_booting_node()
283 @_check_poll_freshness
284 def node_can_shutdown(self, node_actor):
285 if self._nodes_excess() < 1:
287 cloud_node, arvados_node = self._actor_nodes(node_actor)
288 if cloud_node.id in self.shutdowns:
290 shutdown = self._node_shutdown.start(timer_actor=self._timer,
291 cloud_client=self._new_cloud(),
292 cloud_node=cloud_node).proxy()
293 self.shutdowns[cloud_node.id] = shutdown
294 shutdown.subscribe(self._later.node_finished_shutdown)
296 def node_finished_shutdown(self, shutdown_actor):
297 cloud_node_id = shutdown_actor.cloud_node.get().id
298 shutdown_actor.stop()
299 if cloud_node_id in self.booted:
300 self.booted.pop(cloud_node_id).actor.stop()
301 del self.shutdowns[cloud_node_id]
304 self._logger.info("Shutting down after signal.")
305 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
306 for bootnode in self.booting.itervalues():
307 bootnode.stop_if_no_cloud_node()
308 self._later.await_shutdown()
310 def await_shutdown(self):
311 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
312 self._timer.schedule(time.time() + 1, self._later.await_shutdown)