3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, min_size, min_nodes, max_nodes,
101 poll_stale_after=600,
102 boot_fail_after=1800,
103 node_stale_after=7200,
104 node_setup_class=dispatch.ComputeNodeSetupActor,
105 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106 node_actor_class=dispatch.ComputeNodeMonitorActor):
107 super(NodeManagerDaemonActor, self).__init__()
108 self._node_setup = node_setup_class
109 self._node_shutdown = node_shutdown_class
110 self._node_actor = node_actor_class
111 self._cloud_updater = cloud_update_actor
112 self._timer = timer_actor
113 self._new_arvados = arvados_factory
114 self._new_cloud = cloud_factory
115 self._cloud_driver = self._new_cloud()
116 self._logger = logging.getLogger('arvnodeman.daemon')
117 self._later = self.actor_ref.proxy()
118 self.shutdown_windows = shutdown_windows
119 self.min_cloud_size = min_size
120 self.min_nodes = min_nodes
121 self.max_nodes = max_nodes
122 self.poll_stale_after = poll_stale_after
123 self.boot_fail_after = boot_fail_after
124 self.node_stale_after = node_stale_after
126 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127 poll_actor = locals()[poll_name + '_actor']
128 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130 self.last_polls[poll_name] = -self.poll_stale_after
131 self.cloud_nodes = _CloudNodeTracker()
132 self.arvados_nodes = _ArvadosNodeTracker()
133 self.booting = {} # Actor IDs to ComputeNodeSetupActors
134 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
135 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
136 self._logger.debug("Daemon initialized")
138 def _update_poll_time(self, poll_key):
139 self.last_polls[poll_key] = time.time()
141 def _pair_nodes(self, node_record, arvados_node):
142 self._logger.info("Cloud node %s has associated with Arvados node %s",
143 node_record.cloud_node.id, arvados_node['uuid'])
144 self._arvados_nodes_actor.subscribe_to(
145 arvados_node['uuid'], node_record.actor.update_arvados_node)
146 node_record.arvados_node = arvados_node
147 self.arvados_nodes.add(node_record)
149 def _new_node(self, cloud_node):
150 start_time = self._cloud_driver.node_start_time(cloud_node)
151 shutdown_timer = cnode.ShutdownTimer(start_time,
152 self.shutdown_windows)
153 actor = self._node_actor.start(
154 cloud_node=cloud_node,
155 cloud_node_start_time=start_time,
156 shutdown_timer=shutdown_timer,
157 update_actor=self._cloud_updater,
158 timer_actor=self._timer,
160 poll_stale_after=self.poll_stale_after,
161 node_stale_after=self.node_stale_after).proxy()
162 actor.subscribe(self._later.node_can_shutdown)
163 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
164 actor.update_cloud_node)
165 record = _ComputeNodeRecord(actor, cloud_node)
168 def update_cloud_nodes(self, nodelist):
169 self._update_poll_time('cloud_nodes')
170 for key, node in self.cloud_nodes.update_from(nodelist):
171 self._logger.info("Registering new cloud node %s", key)
172 if key in self.booted:
173 record = self.booted.pop(key)
175 record = self._new_node(node)
176 self.cloud_nodes.add(record)
177 for arv_rec in self.arvados_nodes.unpaired():
178 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
179 self._pair_nodes(record, arv_rec.arvados_node)
181 for key, record in self.cloud_nodes.orphans.iteritems():
183 record.cloud_node = None
184 self.shutdowns.pop(key, None)
186 def update_arvados_nodes(self, nodelist):
187 self._update_poll_time('arvados_nodes')
188 for key, node in self.arvados_nodes.update_from(nodelist):
189 self._logger.info("Registering new Arvados node %s", key)
190 record = _ComputeNodeRecord(arvados_node=node)
191 self.arvados_nodes.add(record)
192 for arv_rec in self.arvados_nodes.unpaired():
193 arv_node = arv_rec.arvados_node
194 for cloud_rec in self.cloud_nodes.unpaired():
195 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
196 self._pair_nodes(cloud_rec, arv_node)
200 return sum(len(nodelist) for nodelist in
201 [self.cloud_nodes, self.booted, self.booting])
203 def _nodes_busy(self):
204 return sum(1 for idle in
205 pykka.get_all(rec.actor.in_state('idle') for rec in
206 self.cloud_nodes.nodes.itervalues())
209 def _nodes_wanted(self):
210 up_count = self._nodes_up()
211 under_min = self.min_nodes - up_count
212 over_max = up_count - self.max_nodes
218 up_count -= len(self.shutdowns) + self._nodes_busy()
219 return len(self.last_wishlist) - up_count
221 def _nodes_excess(self):
222 up_count = self._nodes_up() - len(self.shutdowns)
223 over_min = up_count - self.min_nodes
227 return up_count - self._nodes_busy() - len(self.last_wishlist)
229 def update_server_wishlist(self, wishlist):
230 self._update_poll_time('server_wishlist')
231 self.last_wishlist = wishlist
232 nodes_wanted = self._nodes_wanted()
234 self._later.start_node()
235 elif (nodes_wanted < 0) and self.booting:
236 self._later.stop_booting_node()
238 def _check_poll_freshness(orig_func):
239 """Decorator to inhibit a method when poll information is stale.
241 This decorator checks the timestamps of all the poll information the
242 daemon has received. The decorated method is only called if none
243 of the timestamps are considered stale.
245 @functools.wraps(orig_func)
246 def wrapper(self, *args, **kwargs):
248 if all(now - t < self.poll_stale_after
249 for t in self.last_polls.itervalues()):
250 return orig_func(self, *args, **kwargs)
255 @_check_poll_freshness
256 def start_node(self):
257 nodes_wanted = self._nodes_wanted()
260 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
262 cloud_size = self.last_wishlist[self._nodes_up()]
264 cloud_size = self.min_cloud_size
265 self._logger.info("Want %s more nodes. Booting a %s node.",
266 nodes_wanted, cloud_size.name)
267 new_setup = self._node_setup.start(
268 timer_actor=self._timer,
269 arvados_client=self._new_arvados(),
270 arvados_node=arvados_node,
271 cloud_client=self._new_cloud(),
272 cloud_size=cloud_size).proxy()
273 self.booting[new_setup.actor_ref.actor_urn] = new_setup
274 if arvados_node is not None:
275 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
277 new_setup.subscribe(self._later.node_up)
279 self._later.start_node()
281 def _get_actor_attrs(self, actor, *attr_names):
282 return pykka.get_all([getattr(actor, name) for name in attr_names])
284 def node_up(self, setup_proxy):
285 cloud_node = setup_proxy.cloud_node.get()
286 del self.booting[setup_proxy.actor_ref.actor_urn]
288 record = self.cloud_nodes.get(cloud_node.id)
290 record = self._new_node(cloud_node)
291 self.booted[cloud_node.id] = record
292 self._timer.schedule(time.time() + self.boot_fail_after,
293 self._later.shutdown_unpaired_node, cloud_node.id)
295 @_check_poll_freshness
296 def stop_booting_node(self):
297 nodes_excess = self._nodes_excess()
298 if (nodes_excess < 1) or not self.booting:
300 for key, node in self.booting.iteritems():
301 node.stop_if_no_cloud_node().get()
302 if not node.actor_ref.is_alive():
303 del self.booting[key]
305 self._later.stop_booting_node()
308 def _begin_node_shutdown(self, node_actor, cancellable):
309 cloud_node_id = node_actor.cloud_node.get().id
310 if cloud_node_id in self.shutdowns:
312 shutdown = self._node_shutdown.start(
313 timer_actor=self._timer, cloud_client=self._new_cloud(),
314 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
315 self.shutdowns[cloud_node_id] = shutdown
316 shutdown.subscribe(self._later.node_finished_shutdown)
318 @_check_poll_freshness
319 def node_can_shutdown(self, node_actor):
320 if self._nodes_excess() > 0:
321 self._begin_node_shutdown(node_actor, cancellable=True)
323 def shutdown_unpaired_node(self, cloud_node_id):
324 for record_dict in [self.cloud_nodes, self.booted]:
325 if cloud_node_id in record_dict:
326 record = record_dict[cloud_node_id]
330 if record.arvados_node is None:
331 self._begin_node_shutdown(record.actor, cancellable=False)
333 def node_finished_shutdown(self, shutdown_actor):
334 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
336 shutdown_actor.stop()
337 cloud_node_id = cloud_node.id
339 del self.shutdowns[cloud_node_id]
340 elif cloud_node_id in self.booted:
341 self.booted.pop(cloud_node_id).actor.stop()
342 del self.shutdowns[cloud_node_id]
345 self._logger.info("Shutting down after signal.")
346 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
347 for bootnode in self.booting.itervalues():
348 bootnode.stop_if_no_cloud_node()
349 self._later.await_shutdown()
351 def await_shutdown(self):
352 if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
353 self._timer.schedule(time.time() + 1, self._later.await_shutdown)