3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .config import actor_class
14 class _ComputeNodeRecord(object):
15 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16 assignment_time=float('-inf')):
18 self.cloud_node = cloud_node
19 self.arvados_node = arvados_node
20 self.assignment_time = assignment_time
23 class _BaseNodeTracker(object):
28 def __getitem__(self, key):
29 return self.nodes[key]
32 return len(self.nodes)
34 def get(self, key, default=None):
35 return self.nodes.get(key, default)
37 def record_key(self, record):
38 return self.item_key(getattr(record, self.RECORD_ATTR))
40 def add(self, record):
41 self.nodes[self.record_key(record)] = record
43 def update_record(self, key, item):
44 setattr(self.nodes[key], self.RECORD_ATTR, item)
46 def update_from(self, response):
47 unseen = set(self.nodes.iterkeys())
49 key = self.item_key(item)
52 self.update_record(key, item)
55 self.orphans = {key: self.nodes.pop(key) for key in unseen}
58 return (record for record in self.nodes.itervalues()
59 if getattr(record, self.PAIR_ATTR) is None)
62 class _CloudNodeTracker(_BaseNodeTracker):
63 RECORD_ATTR = 'cloud_node'
64 PAIR_ATTR = 'arvados_node'
65 item_key = staticmethod(lambda cloud_node: cloud_node.id)
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69 RECORD_ATTR = 'arvados_node'
70 PAIR_ATTR = 'cloud_node'
71 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
73 def find_stale_node(self, stale_time):
74 for record in self.nodes.itervalues():
75 node = record.arvados_node
76 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
78 not cnode.timestamp_fresh(record.assignment_time,
84 class NodeManagerDaemonActor(actor_class):
85 """Node Manager daemon.
87 This actor subscribes to all information polls about cloud nodes,
88 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
89 for every cloud node, subscribing them to poll updates
90 appropriately. It creates and destroys cloud nodes based on job queue
91 demand, and stops the corresponding ComputeNode actors when their work
94 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95 cloud_nodes_actor, cloud_update_actor, timer_actor,
96 arvados_factory, cloud_factory,
97 shutdown_windows, max_nodes,
98 poll_stale_after=600, node_stale_after=7200,
99 node_setup_class=cnode.ComputeNodeSetupActor,
100 node_shutdown_class=cnode.ComputeNodeShutdownActor,
101 node_actor_class=cnode.ComputeNodeMonitorActor):
102 super(NodeManagerDaemonActor, self).__init__()
103 self._node_setup = node_setup_class
104 self._node_shutdown = node_shutdown_class
105 self._node_actor = node_actor_class
106 self._cloud_updater = cloud_update_actor
107 self._timer = timer_actor
108 self._new_arvados = arvados_factory
109 self._new_cloud = cloud_factory
110 self._cloud_driver = self._new_cloud()
111 self._logger = logging.getLogger('arvnodeman.daemon')
112 self._later = self.actor_ref.proxy()
113 self.shutdown_windows = shutdown_windows
114 self.max_nodes = max_nodes
115 self.poll_stale_after = poll_stale_after
116 self.node_stale_after = node_stale_after
118 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
119 poll_actor = locals()[poll_name + '_actor']
120 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
121 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
122 self.last_polls[poll_name] = -self.poll_stale_after
123 self.cloud_nodes = _CloudNodeTracker()
124 self.arvados_nodes = _ArvadosNodeTracker()
125 self.booting = {} # Actor IDs to ComputeNodeSetupActors
126 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
127 self._logger.debug("Daemon initialized")
129 def _update_poll_time(self, poll_key):
130 self.last_polls[poll_key] = time.time()
132 def _pair_nodes(self, node_record, arvados_node):
133 self._logger.info("Cloud node %s has associated with Arvados node %s",
134 node_record.cloud_node.id, arvados_node['uuid'])
135 self._arvados_nodes_actor.subscribe_to(
136 arvados_node['uuid'], node_record.actor.update_arvados_node)
137 node_record.arvados_node = arvados_node
138 self.arvados_nodes.add(node_record)
140 def _new_node(self, cloud_node):
141 start_time = self._cloud_driver.node_start_time(cloud_node)
142 shutdown_timer = cnode.ShutdownTimer(start_time,
143 self.shutdown_windows)
144 actor = self._node_actor.start(
145 cloud_node=cloud_node,
146 cloud_node_start_time=start_time,
147 shutdown_timer=shutdown_timer,
148 update_actor=self._cloud_updater,
149 timer_actor=self._timer,
151 poll_stale_after=self.poll_stale_after,
152 node_stale_after=self.node_stale_after).proxy()
153 actor.subscribe(self._later.node_can_shutdown)
154 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
155 actor.update_cloud_node)
156 record = _ComputeNodeRecord(actor, cloud_node)
157 self.cloud_nodes.add(record)
160 def update_cloud_nodes(self, nodelist):
161 self._update_poll_time('cloud_nodes')
162 for key, node in self.cloud_nodes.update_from(nodelist):
163 self._logger.info("Registering new cloud node %s", key)
164 record = self._new_node(node)
165 for arv_rec in self.arvados_nodes.unpaired():
166 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
167 self._pair_nodes(record, arv_rec.arvados_node)
169 for key, record in self.cloud_nodes.orphans.iteritems():
171 if key in self.shutdowns:
172 self.shutdowns.pop(key).stop()
174 def update_arvados_nodes(self, nodelist):
175 self._update_poll_time('arvados_nodes')
176 for key, node in self.arvados_nodes.update_from(nodelist):
177 self._logger.info("Registering new Arvados node %s", key)
178 record = _ComputeNodeRecord(arvados_node=node)
179 self.arvados_nodes.add(record)
180 for arv_rec in self.arvados_nodes.unpaired():
181 arv_node = arv_rec.arvados_node
182 for cloud_rec in self.cloud_nodes.unpaired():
183 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
184 self._pair_nodes(cloud_rec, arv_node)
187 def _node_count(self):
188 up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
189 return up - len(self.shutdowns)
191 def _nodes_wanted(self):
192 return len(self.last_wishlist) - self._node_count()
194 def _nodes_excess(self):
195 return -self._nodes_wanted()
197 def update_server_wishlist(self, wishlist):
198 self._update_poll_time('server_wishlist')
199 self.last_wishlist = wishlist[:self.max_nodes]
200 nodes_wanted = self._nodes_wanted()
202 self._later.start_node()
203 elif (nodes_wanted < 0) and self.booting:
204 self._later.stop_booting_node()
206 def _check_poll_freshness(orig_func):
207 """Decorator to inhibit a method when poll information is stale.
209 This decorator checks the timestamps of all the poll information the
210 daemon has received. The decorated method is only called if none
211 of the timestamps are considered stale.
213 @functools.wraps(orig_func)
214 def wrapper(self, *args, **kwargs):
216 if all(now - t < self.poll_stale_after
217 for t in self.last_polls.itervalues()):
218 return orig_func(self, *args, **kwargs)
223 @_check_poll_freshness
224 def start_node(self):
225 nodes_wanted = self._nodes_wanted()
228 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
229 cloud_size = self.last_wishlist[nodes_wanted - 1]
230 self._logger.info("Want %s more nodes. Booting a %s node.",
231 nodes_wanted, cloud_size.name)
232 new_setup = self._node_setup.start(
233 timer_actor=self._timer,
234 arvados_client=self._new_arvados(),
235 arvados_node=arvados_node,
236 cloud_client=self._new_cloud(),
237 cloud_size=cloud_size).proxy()
238 self.booting[new_setup.actor_ref.actor_urn] = new_setup
239 if arvados_node is not None:
240 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
242 new_setup.subscribe(self._later.node_up)
244 self._later.start_node()
246 def _actor_nodes(self, node_actor):
247 return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
249 def node_up(self, setup_proxy):
250 cloud_node, arvados_node = self._actor_nodes(setup_proxy)
251 del self.booting[setup_proxy.actor_ref.actor_urn]
253 record = self.cloud_nodes.get(cloud_node.id)
255 record = self._new_node(cloud_node)
256 self._pair_nodes(record, arvados_node)
258 @_check_poll_freshness
259 def stop_booting_node(self):
260 nodes_excess = self._nodes_excess()
261 if (nodes_excess < 1) or not self.booting:
263 for key, node in self.booting.iteritems():
264 node.stop_if_no_cloud_node().get()
265 if not node.actor_ref.is_alive():
266 del self.booting[key]
268 self._later.stop_booting_node()
271 @_check_poll_freshness
272 def node_can_shutdown(self, node_actor):
273 if self._nodes_excess() < 1:
275 cloud_node, arvados_node = self._actor_nodes(node_actor)
276 if cloud_node.id in self.shutdowns:
278 shutdown = self._node_shutdown.start(timer_actor=self._timer,
279 cloud_client=self._new_cloud(),
280 cloud_node=cloud_node).proxy()
281 self.shutdowns[cloud_node.id] = shutdown
284 self._logger.info("Shutting down after signal.")
285 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
286 for bootnode in self.booting.itervalues():
287 bootnode.stop_if_no_cloud_node()
288 self._later.await_shutdown()
290 def await_shutdown(self):
291 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
292 self._timer.schedule(time.time() + 1, self._later.await_shutdown)