3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, min_nodes, max_nodes,
101 poll_stale_after=600,
102 boot_fail_after=1800,
103 node_stale_after=7200,
104 node_setup_class=dispatch.ComputeNodeSetupActor,
105 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106 node_actor_class=dispatch.ComputeNodeMonitorActor):
107 super(NodeManagerDaemonActor, self).__init__()
108 self._node_setup = node_setup_class
109 self._node_shutdown = node_shutdown_class
110 self._node_actor = node_actor_class
111 self._cloud_updater = cloud_update_actor
112 self._timer = timer_actor
113 self._new_arvados = arvados_factory
114 self._new_cloud = cloud_factory
115 self._cloud_driver = self._new_cloud()
116 self._logger = logging.getLogger('arvnodeman.daemon')
117 self._later = self.actor_ref.proxy()
118 self.shutdown_windows = shutdown_windows
119 self.min_nodes = min_nodes
120 self.max_nodes = max_nodes
121 self.poll_stale_after = poll_stale_after
122 self.boot_fail_after = boot_fail_after
123 self.node_stale_after = node_stale_after
125 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
126 poll_actor = locals()[poll_name + '_actor']
127 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
128 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
129 self.last_polls[poll_name] = -self.poll_stale_after
130 self.cloud_nodes = _CloudNodeTracker()
131 self.arvados_nodes = _ArvadosNodeTracker()
132 self.booting = {} # Actor IDs to ComputeNodeSetupActors
133 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
134 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
135 self._logger.debug("Daemon initialized")
137 def _update_poll_time(self, poll_key):
138 self.last_polls[poll_key] = time.time()
140 def _pair_nodes(self, node_record, arvados_node):
141 self._logger.info("Cloud node %s has associated with Arvados node %s",
142 node_record.cloud_node.id, arvados_node['uuid'])
143 self._arvados_nodes_actor.subscribe_to(
144 arvados_node['uuid'], node_record.actor.update_arvados_node)
145 node_record.arvados_node = arvados_node
146 self.arvados_nodes.add(node_record)
148 def _new_node(self, cloud_node):
149 start_time = self._cloud_driver.node_start_time(cloud_node)
150 shutdown_timer = cnode.ShutdownTimer(start_time,
151 self.shutdown_windows)
152 actor = self._node_actor.start(
153 cloud_node=cloud_node,
154 cloud_node_start_time=start_time,
155 shutdown_timer=shutdown_timer,
156 update_actor=self._cloud_updater,
157 timer_actor=self._timer,
159 poll_stale_after=self.poll_stale_after,
160 node_stale_after=self.node_stale_after).proxy()
161 actor.subscribe(self._later.node_can_shutdown)
162 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
163 actor.update_cloud_node)
164 record = _ComputeNodeRecord(actor, cloud_node)
167 def update_cloud_nodes(self, nodelist):
168 self._update_poll_time('cloud_nodes')
169 for key, node in self.cloud_nodes.update_from(nodelist):
170 self._logger.info("Registering new cloud node %s", key)
171 if key in self.booted:
172 record = self.booted.pop(key)
174 record = self._new_node(node)
175 self.cloud_nodes.add(record)
176 for arv_rec in self.arvados_nodes.unpaired():
177 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
178 self._pair_nodes(record, arv_rec.arvados_node)
180 for key, record in self.cloud_nodes.orphans.iteritems():
182 record.cloud_node = None
183 self.shutdowns.pop(key, None)
185 def update_arvados_nodes(self, nodelist):
186 self._update_poll_time('arvados_nodes')
187 for key, node in self.arvados_nodes.update_from(nodelist):
188 self._logger.info("Registering new Arvados node %s", key)
189 record = _ComputeNodeRecord(arvados_node=node)
190 self.arvados_nodes.add(record)
191 for arv_rec in self.arvados_nodes.unpaired():
192 arv_node = arv_rec.arvados_node
193 for cloud_rec in self.cloud_nodes.unpaired():
194 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
195 self._pair_nodes(cloud_rec, arv_node)
199 return sum(len(nodelist) for nodelist in
200 [self.cloud_nodes, self.booted, self.booting])
202 def _nodes_busy(self):
203 return sum(1 for idle in
204 pykka.get_all(rec.actor.in_state('idle') for rec in
205 self.cloud_nodes.nodes.itervalues())
208 def _nodes_wanted(self):
209 up_count = self._nodes_up()
210 over_max = up_count - self.max_nodes
214 up_count -= len(self.shutdowns) + self._nodes_busy()
215 return len(self.last_wishlist) - up_count
217 def _nodes_excess(self):
218 up_count = self._nodes_up() - len(self.shutdowns)
219 over_min = up_count - self.min_nodes
223 return up_count - self._nodes_busy() - len(self.last_wishlist)
225 def update_server_wishlist(self, wishlist):
226 self._update_poll_time('server_wishlist')
227 self.last_wishlist = wishlist
228 nodes_wanted = self._nodes_wanted()
230 self._later.start_node()
231 elif (nodes_wanted < 0) and self.booting:
232 self._later.stop_booting_node()
234 def _check_poll_freshness(orig_func):
235 """Decorator to inhibit a method when poll information is stale.
237 This decorator checks the timestamps of all the poll information the
238 daemon has received. The decorated method is only called if none
239 of the timestamps are considered stale.
241 @functools.wraps(orig_func)
242 def wrapper(self, *args, **kwargs):
244 if all(now - t < self.poll_stale_after
245 for t in self.last_polls.itervalues()):
246 return orig_func(self, *args, **kwargs)
251 @_check_poll_freshness
252 def start_node(self):
253 nodes_wanted = self._nodes_wanted()
256 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
257 cloud_size = self.last_wishlist[nodes_wanted - 1]
258 self._logger.info("Want %s more nodes. Booting a %s node.",
259 nodes_wanted, cloud_size.name)
260 new_setup = self._node_setup.start(
261 timer_actor=self._timer,
262 arvados_client=self._new_arvados(),
263 arvados_node=arvados_node,
264 cloud_client=self._new_cloud(),
265 cloud_size=cloud_size).proxy()
266 self.booting[new_setup.actor_ref.actor_urn] = new_setup
267 if arvados_node is not None:
268 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
270 new_setup.subscribe(self._later.node_up)
272 self._later.start_node()
274 def _get_actor_attrs(self, actor, *attr_names):
275 return pykka.get_all([getattr(actor, name) for name in attr_names])
277 def node_up(self, setup_proxy):
278 cloud_node = setup_proxy.cloud_node.get()
279 del self.booting[setup_proxy.actor_ref.actor_urn]
281 record = self.cloud_nodes.get(cloud_node.id)
283 record = self._new_node(cloud_node)
284 self.booted[cloud_node.id] = record
285 self._timer.schedule(time.time() + self.boot_fail_after,
286 self._later.shutdown_unpaired_node, cloud_node.id)
288 @_check_poll_freshness
289 def stop_booting_node(self):
290 nodes_excess = self._nodes_excess()
291 if (nodes_excess < 1) or not self.booting:
293 for key, node in self.booting.iteritems():
294 node.stop_if_no_cloud_node().get()
295 if not node.actor_ref.is_alive():
296 del self.booting[key]
298 self._later.stop_booting_node()
301 def _begin_node_shutdown(self, node_actor, cancellable):
302 cloud_node_id = node_actor.cloud_node.get().id
303 if cloud_node_id in self.shutdowns:
305 shutdown = self._node_shutdown.start(
306 timer_actor=self._timer, cloud_client=self._new_cloud(),
307 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
308 self.shutdowns[cloud_node_id] = shutdown
309 shutdown.subscribe(self._later.node_finished_shutdown)
311 @_check_poll_freshness
312 def node_can_shutdown(self, node_actor):
313 if self._nodes_excess() > 0:
314 self._begin_node_shutdown(node_actor, cancellable=True)
316 def shutdown_unpaired_node(self, cloud_node_id):
317 for record_dict in [self.cloud_nodes, self.booted]:
318 if cloud_node_id in record_dict:
319 record = record_dict[cloud_node_id]
323 if record.arvados_node is None:
324 self._begin_node_shutdown(record.actor, cancellable=False)
326 def node_finished_shutdown(self, shutdown_actor):
327 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
329 shutdown_actor.stop()
330 cloud_node_id = cloud_node.id
332 del self.shutdowns[cloud_node_id]
333 elif cloud_node_id in self.booted:
334 self.booted.pop(cloud_node_id).actor.stop()
335 del self.shutdowns[cloud_node_id]
338 self._logger.info("Shutting down after signal.")
339 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
340 for bootnode in self.booting.itervalues():
341 bootnode.stop_if_no_cloud_node()
342 self._later.await_shutdown()
344 def await_shutdown(self):
345 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
346 self._timer.schedule(time.time() + 1, self._later.await_shutdown)