3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .config import actor_class
14 class _ComputeNodeRecord(object):
15 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16 assignment_time=float('-inf')):
18 self.cloud_node = cloud_node
19 self.arvados_node = arvados_node
20 self.assignment_time = assignment_time
23 class _BaseNodeTracker(object):
28 def __getitem__(self, key):
29 return self.nodes[key]
32 return len(self.nodes)
34 def get(self, key, default=None):
35 return self.nodes.get(key, default)
37 def record_key(self, record):
38 return self.item_key(getattr(record, self.RECORD_ATTR))
40 def add(self, record):
41 self.nodes[self.record_key(record)] = record
43 def update_record(self, key, item):
44 setattr(self.nodes[key], self.RECORD_ATTR, item)
46 def update_from(self, response):
47 unseen = set(self.nodes.iterkeys())
49 key = self.item_key(item)
52 self.update_record(key, item)
55 self.orphans = {key: self.nodes.pop(key) for key in unseen}
58 return (record for record in self.nodes.itervalues()
59 if getattr(record, self.PAIR_ATTR) is None)
62 class _CloudNodeTracker(_BaseNodeTracker):
63 RECORD_ATTR = 'cloud_node'
64 PAIR_ATTR = 'arvados_node'
65 item_key = staticmethod(lambda cloud_node: cloud_node.id)
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69 RECORD_ATTR = 'arvados_node'
70 PAIR_ATTR = 'cloud_node'
71 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
73 def find_stale_node(self, stale_time):
74 for record in self.nodes.itervalues():
75 node = record.arvados_node
76 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
78 not cnode.timestamp_fresh(record.assignment_time,
84 class NodeManagerDaemonActor(actor_class):
85 """Node Manager daemon.
87 This actor subscribes to all information polls about cloud nodes,
88 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
89 for every cloud node, subscribing them to poll updates
90 appropriately. It creates and destroys cloud nodes based on job queue
91 demand, and stops the corresponding ComputeNode actors when their work
94 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95 cloud_nodes_actor, cloud_update_actor, timer_actor,
96 arvados_factory, cloud_factory,
97 shutdown_windows, min_nodes, max_nodes,
98 poll_stale_after=600, node_stale_after=7200,
99 node_setup_class=cnode.ComputeNodeSetupActor,
100 node_shutdown_class=cnode.ComputeNodeShutdownActor,
101 node_actor_class=cnode.ComputeNodeMonitorActor):
102 super(NodeManagerDaemonActor, self).__init__()
103 self._node_setup = node_setup_class
104 self._node_shutdown = node_shutdown_class
105 self._node_actor = node_actor_class
106 self._cloud_updater = cloud_update_actor
107 self._timer = timer_actor
108 self._new_arvados = arvados_factory
109 self._new_cloud = cloud_factory
110 self._cloud_driver = self._new_cloud()
111 self._logger = logging.getLogger('arvnodeman.daemon')
112 self._later = self.actor_ref.proxy()
113 self.shutdown_windows = shutdown_windows
114 self.min_nodes = min_nodes
115 self.max_nodes = max_nodes
116 self.poll_stale_after = poll_stale_after
117 self.node_stale_after = node_stale_after
119 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
120 poll_actor = locals()[poll_name + '_actor']
121 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
122 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
123 self.last_polls[poll_name] = -self.poll_stale_after
124 self.cloud_nodes = _CloudNodeTracker()
125 self.arvados_nodes = _ArvadosNodeTracker()
126 self.booting = {} # Actor IDs to ComputeNodeSetupActors
127 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
128 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
129 self._logger.debug("Daemon initialized")
131 def _update_poll_time(self, poll_key):
132 self.last_polls[poll_key] = time.time()
134 def _pair_nodes(self, node_record, arvados_node):
135 self._logger.info("Cloud node %s has associated with Arvados node %s",
136 node_record.cloud_node.id, arvados_node['uuid'])
137 self._arvados_nodes_actor.subscribe_to(
138 arvados_node['uuid'], node_record.actor.update_arvados_node)
139 node_record.arvados_node = arvados_node
140 self.arvados_nodes.add(node_record)
142 def _new_node(self, cloud_node):
143 start_time = self._cloud_driver.node_start_time(cloud_node)
144 shutdown_timer = cnode.ShutdownTimer(start_time,
145 self.shutdown_windows)
146 actor = self._node_actor.start(
147 cloud_node=cloud_node,
148 cloud_node_start_time=start_time,
149 shutdown_timer=shutdown_timer,
150 update_actor=self._cloud_updater,
151 timer_actor=self._timer,
153 poll_stale_after=self.poll_stale_after,
154 node_stale_after=self.node_stale_after).proxy()
155 actor.subscribe(self._later.node_can_shutdown)
156 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
157 actor.update_cloud_node)
158 record = _ComputeNodeRecord(actor, cloud_node)
161 def update_cloud_nodes(self, nodelist):
162 self._update_poll_time('cloud_nodes')
163 for key, node in self.cloud_nodes.update_from(nodelist):
164 self._logger.info("Registering new cloud node %s", key)
165 if key in self.booted:
166 record = self.booted.pop(key)
168 record = self._new_node(node)
169 self.cloud_nodes.add(record)
170 for arv_rec in self.arvados_nodes.unpaired():
171 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
172 self._pair_nodes(record, arv_rec.arvados_node)
174 for key, record in self.cloud_nodes.orphans.iteritems():
176 self.shutdowns.pop(key, None)
178 def update_arvados_nodes(self, nodelist):
179 self._update_poll_time('arvados_nodes')
180 for key, node in self.arvados_nodes.update_from(nodelist):
181 self._logger.info("Registering new Arvados node %s", key)
182 record = _ComputeNodeRecord(arvados_node=node)
183 self.arvados_nodes.add(record)
184 for arv_rec in self.arvados_nodes.unpaired():
185 arv_node = arv_rec.arvados_node
186 for cloud_rec in self.cloud_nodes.unpaired():
187 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
188 self._pair_nodes(cloud_rec, arv_node)
192 up = sum(len(nodelist) for nodelist in
193 [self.cloud_nodes, self.booted, self.booting])
194 return up - len(self.shutdowns)
196 def _nodes_busy(self):
197 return sum(1 for idle in
198 pykka.get_all(rec.actor.in_state('idle') for rec in
199 self.cloud_nodes.nodes.itervalues())
202 def _nodes_wanted(self):
203 return min(len(self.last_wishlist) + self._nodes_busy(),
204 self.max_nodes) - self._nodes_up()
206 def _nodes_excess(self):
207 needed_nodes = self._nodes_busy() + len(self.last_wishlist)
208 return (self._nodes_up() - max(self.min_nodes, needed_nodes))
210 def update_server_wishlist(self, wishlist):
211 self._update_poll_time('server_wishlist')
212 self.last_wishlist = wishlist
213 nodes_wanted = self._nodes_wanted()
215 self._later.start_node()
216 elif (nodes_wanted < 0) and self.booting:
217 self._later.stop_booting_node()
219 def _check_poll_freshness(orig_func):
220 """Decorator to inhibit a method when poll information is stale.
222 This decorator checks the timestamps of all the poll information the
223 daemon has received. The decorated method is only called if none
224 of the timestamps are considered stale.
226 @functools.wraps(orig_func)
227 def wrapper(self, *args, **kwargs):
229 if all(now - t < self.poll_stale_after
230 for t in self.last_polls.itervalues()):
231 return orig_func(self, *args, **kwargs)
236 @_check_poll_freshness
237 def start_node(self):
238 nodes_wanted = self._nodes_wanted()
241 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
242 cloud_size = self.last_wishlist[nodes_wanted - 1]
243 self._logger.info("Want %s more nodes. Booting a %s node.",
244 nodes_wanted, cloud_size.name)
245 new_setup = self._node_setup.start(
246 timer_actor=self._timer,
247 arvados_client=self._new_arvados(),
248 arvados_node=arvados_node,
249 cloud_client=self._new_cloud(),
250 cloud_size=cloud_size).proxy()
251 self.booting[new_setup.actor_ref.actor_urn] = new_setup
252 if arvados_node is not None:
253 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
255 new_setup.subscribe(self._later.node_up)
257 self._later.start_node()
259 def _actor_nodes(self, node_actor):
260 return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
262 def node_up(self, setup_proxy):
263 cloud_node, arvados_node = self._actor_nodes(setup_proxy)
264 del self.booting[setup_proxy.actor_ref.actor_urn]
266 record = self.cloud_nodes.get(cloud_node.id)
268 record = self._new_node(cloud_node)
269 self.booted[cloud_node.id] = record
270 self._pair_nodes(record, arvados_node)
272 @_check_poll_freshness
273 def stop_booting_node(self):
274 nodes_excess = self._nodes_excess()
275 if (nodes_excess < 1) or not self.booting:
277 for key, node in self.booting.iteritems():
278 node.stop_if_no_cloud_node().get()
279 if not node.actor_ref.is_alive():
280 del self.booting[key]
282 self._later.stop_booting_node()
285 @_check_poll_freshness
286 def node_can_shutdown(self, node_actor):
287 if self._nodes_excess() < 1:
289 cloud_node, arvados_node = self._actor_nodes(node_actor)
290 if cloud_node.id in self.shutdowns:
292 shutdown = self._node_shutdown.start(timer_actor=self._timer,
293 cloud_client=self._new_cloud(),
294 cloud_node=cloud_node).proxy()
295 self.shutdowns[cloud_node.id] = shutdown
296 shutdown.subscribe(self._later.node_finished_shutdown)
298 def node_finished_shutdown(self, shutdown_actor):
299 cloud_node_id = shutdown_actor.cloud_node.get().id
300 shutdown_actor.stop()
301 if cloud_node_id in self.booted:
302 self.booted.pop(cloud_node_id).actor.stop()
303 del self.shutdowns[cloud_node_id]
306 self._logger.info("Shutting down after signal.")
307 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
308 for bootnode in self.booting.itervalues():
309 bootnode.stop_if_no_cloud_node()
310 self._later.await_shutdown()
312 def await_shutdown(self):
313 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
314 self._timer.schedule(time.time() + 1, self._later.await_shutdown)