3 from __future__ import absolute_import, print_function
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
15 class _ComputeNodeRecord(object):
16 def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17 assignment_time=float('-inf')):
19 self.cloud_node = cloud_node
20 self.arvados_node = arvados_node
21 self.assignment_time = assignment_time
24 class _BaseNodeTracker(object):
29 # Proxy the methods listed below to self.nodes.
30 def _proxy_method(name):
31 method = getattr(dict, name)
32 @functools.wraps(method, ('__name__', '__doc__'))
33 def wrapper(self, *args, **kwargs):
34 return method(self.nodes, *args, **kwargs)
37 for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38 locals()[_method_name] = _proxy_method(_method_name)
40 def record_key(self, record):
41 return self.item_key(getattr(record, self.RECORD_ATTR))
43 def add(self, record):
44 self.nodes[self.record_key(record)] = record
46 def update_record(self, key, item):
47 setattr(self.nodes[key], self.RECORD_ATTR, item)
49 def update_from(self, response):
50 unseen = set(self.nodes.iterkeys())
52 key = self.item_key(item)
55 self.update_record(key, item)
58 self.orphans = {key: self.nodes.pop(key) for key in unseen}
61 return (record for record in self.nodes.itervalues()
62 if getattr(record, self.PAIR_ATTR) is None)
65 class _CloudNodeTracker(_BaseNodeTracker):
66 RECORD_ATTR = 'cloud_node'
67 PAIR_ATTR = 'arvados_node'
68 item_key = staticmethod(lambda cloud_node: cloud_node.id)
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72 RECORD_ATTR = 'arvados_node'
73 PAIR_ATTR = 'cloud_node'
74 item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76 def find_stale_node(self, stale_time):
77 for record in self.nodes.itervalues():
78 node = record.arvados_node
79 if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81 not cnode.timestamp_fresh(record.assignment_time,
87 class NodeManagerDaemonActor(actor_class):
88 """Node Manager daemon.
90 This actor subscribes to all information polls about cloud nodes,
91 Arvados nodes, and the job queue. It creates a ComputeNodeMonitorActor
92 for every cloud node, subscribing them to poll updates
93 appropriately. It creates and destroys cloud nodes based on job queue
94 demand, and stops the corresponding ComputeNode actors when their work
97 def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98 cloud_nodes_actor, cloud_update_actor, timer_actor,
99 arvados_factory, cloud_factory,
100 shutdown_windows, min_size, min_nodes, max_nodes,
101 poll_stale_after=600,
102 boot_fail_after=1800,
103 node_stale_after=7200,
104 node_setup_class=dispatch.ComputeNodeSetupActor,
105 node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106 node_actor_class=dispatch.ComputeNodeMonitorActor):
107 super(NodeManagerDaemonActor, self).__init__()
108 self._node_setup = node_setup_class
109 self._node_shutdown = node_shutdown_class
110 self._node_actor = node_actor_class
111 self._cloud_updater = cloud_update_actor
112 self._timer = timer_actor
113 self._new_arvados = arvados_factory
114 self._new_cloud = cloud_factory
115 self._cloud_driver = self._new_cloud()
116 self._logger = logging.getLogger('arvnodeman.daemon')
117 self._later = self.actor_ref.proxy()
118 self.shutdown_windows = shutdown_windows
119 self.min_cloud_size = min_size
120 self.min_nodes = min_nodes
121 self.max_nodes = max_nodes
122 self.poll_stale_after = poll_stale_after
123 self.boot_fail_after = boot_fail_after
124 self.node_stale_after = node_stale_after
126 for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127 poll_actor = locals()[poll_name + '_actor']
128 poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129 setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130 self.last_polls[poll_name] = -self.poll_stale_after
131 self.cloud_nodes = _CloudNodeTracker()
132 self.arvados_nodes = _ArvadosNodeTracker()
133 self.booting = {} # Actor IDs to ComputeNodeSetupActors
134 self.booted = {} # Cloud node IDs to _ComputeNodeRecords
135 self.shutdowns = {} # Cloud node IDs to ComputeNodeShutdownActors
136 self._logger.debug("Daemon initialized")
138 def _update_poll_time(self, poll_key):
139 self.last_polls[poll_key] = time.time()
141 def _pair_nodes(self, node_record, arvados_node):
142 self._logger.info("Cloud node %s has associated with Arvados node %s",
143 node_record.cloud_node.id, arvados_node['uuid'])
144 self._arvados_nodes_actor.subscribe_to(
145 arvados_node['uuid'], node_record.actor.update_arvados_node)
146 node_record.arvados_node = arvados_node
147 self.arvados_nodes.add(node_record)
149 def _new_node(self, cloud_node):
150 start_time = self._cloud_driver.node_start_time(cloud_node)
151 shutdown_timer = cnode.ShutdownTimer(start_time,
152 self.shutdown_windows)
153 actor = self._node_actor.start(
154 cloud_node=cloud_node,
155 cloud_node_start_time=start_time,
156 shutdown_timer=shutdown_timer,
157 cloud_fqdn_func=self._cloud_driver.node_fqdn,
158 update_actor=self._cloud_updater,
159 timer_actor=self._timer,
161 poll_stale_after=self.poll_stale_after,
162 node_stale_after=self.node_stale_after,
163 cloud_client=self._cloud_driver,
164 boot_fail_after=self.boot_fail_after).proxy()
165 actor.subscribe(self._later.node_can_shutdown)
166 self._cloud_nodes_actor.subscribe_to(cloud_node.id,
167 actor.update_cloud_node)
168 record = _ComputeNodeRecord(actor, cloud_node)
171 def update_cloud_nodes(self, nodelist):
172 self._update_poll_time('cloud_nodes')
173 for key, node in self.cloud_nodes.update_from(nodelist):
174 self._logger.info("Registering new cloud node %s", key)
175 if key in self.booted:
176 record = self.booted.pop(key)
178 record = self._new_node(node)
179 self.cloud_nodes.add(record)
180 for arv_rec in self.arvados_nodes.unpaired():
181 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
182 self._pair_nodes(record, arv_rec.arvados_node)
184 for key, record in self.cloud_nodes.orphans.iteritems():
186 record.cloud_node = None
187 self.shutdowns.pop(key, None)
189 def update_arvados_nodes(self, nodelist):
190 self._update_poll_time('arvados_nodes')
191 for key, node in self.arvados_nodes.update_from(nodelist):
192 self._logger.info("Registering new Arvados node %s", key)
193 record = _ComputeNodeRecord(arvados_node=node)
194 self.arvados_nodes.add(record)
195 for arv_rec in self.arvados_nodes.unpaired():
196 arv_node = arv_rec.arvados_node
197 for cloud_rec in self.cloud_nodes.unpaired():
198 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
199 self._pair_nodes(cloud_rec, arv_node)
203 return sum(len(nodelist) for nodelist in
204 [self.cloud_nodes, self.booted, self.booting])
206 def _nodes_busy(self):
207 return sum(1 for busy in
208 pykka.get_all(rec.actor.in_state('busy') for rec in
209 self.cloud_nodes.nodes.itervalues())
212 def _nodes_missing(self):
213 return sum(1 for arv_node in
214 pykka.get_all(rec.actor.arvados_node for rec in
215 self.cloud_nodes.nodes.itervalues()
216 if rec.actor.cloud_node.get().id not in self.shutdowns)
217 if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
219 def _nodes_wanted(self):
220 up_count = self._nodes_up()
221 under_min = self.min_nodes - up_count
222 over_max = up_count - self.max_nodes
228 up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
229 return len(self.last_wishlist) - up_count
231 def _nodes_excess(self):
232 up_count = self._nodes_up() - len(self.shutdowns)
233 over_min = up_count - self.min_nodes
237 return up_count - self._nodes_busy() - len(self.last_wishlist)
239 def update_server_wishlist(self, wishlist):
240 self._update_poll_time('server_wishlist')
241 self.last_wishlist = wishlist
242 nodes_wanted = self._nodes_wanted()
244 self._later.start_node()
245 elif (nodes_wanted < 0) and self.booting:
246 self._later.stop_booting_node()
248 def _check_poll_freshness(orig_func):
249 """Decorator to inhibit a method when poll information is stale.
251 This decorator checks the timestamps of all the poll information the
252 daemon has received. The decorated method is only called if none
253 of the timestamps are considered stale.
255 @functools.wraps(orig_func)
256 def wrapper(self, *args, **kwargs):
258 if all(now - t < self.poll_stale_after
259 for t in self.last_polls.itervalues()):
260 return orig_func(self, *args, **kwargs)
265 @_check_poll_freshness
266 def start_node(self):
267 nodes_wanted = self._nodes_wanted()
270 arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
272 cloud_size = self.last_wishlist[self._nodes_up()]
274 cloud_size = self.min_cloud_size
275 self._logger.info("Want %s more nodes. Booting a %s node.",
276 nodes_wanted, cloud_size.name)
277 new_setup = self._node_setup.start(
278 timer_actor=self._timer,
279 arvados_client=self._new_arvados(),
280 arvados_node=arvados_node,
281 cloud_client=self._new_cloud(),
282 cloud_size=cloud_size).proxy()
283 self.booting[new_setup.actor_ref.actor_urn] = new_setup
284 if arvados_node is not None:
285 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
287 new_setup.subscribe(self._later.node_up)
289 self._later.start_node()
291 def _get_actor_attrs(self, actor, *attr_names):
292 return pykka.get_all([getattr(actor, name) for name in attr_names])
294 def node_up(self, setup_proxy):
295 cloud_node = setup_proxy.cloud_node.get()
296 del self.booting[setup_proxy.actor_ref.actor_urn]
298 record = self.cloud_nodes.get(cloud_node.id)
300 record = self._new_node(cloud_node)
301 self.booted[cloud_node.id] = record
302 self._timer.schedule(time.time() + self.boot_fail_after,
303 self._later.shutdown_unpaired_node, cloud_node.id)
305 @_check_poll_freshness
306 def stop_booting_node(self):
307 nodes_excess = self._nodes_excess()
308 if (nodes_excess < 1) or not self.booting:
310 for key, node in self.booting.iteritems():
311 if node.stop_if_no_cloud_node().get():
312 del self.booting[key]
314 self._later.stop_booting_node()
317 def _begin_node_shutdown(self, node_actor, cancellable):
318 cloud_node_id = node_actor.cloud_node.get().id
319 if cloud_node_id in self.shutdowns:
321 shutdown = self._node_shutdown.start(
322 timer_actor=self._timer, cloud_client=self._new_cloud(),
323 arvados_client=self._new_arvados(),
324 node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
325 self.shutdowns[cloud_node_id] = shutdown
326 shutdown.subscribe(self._later.node_finished_shutdown)
328 @_check_poll_freshness
329 def node_can_shutdown(self, node_actor):
330 if self._nodes_excess() > 0:
331 self._begin_node_shutdown(node_actor, cancellable=True)
333 def shutdown_unpaired_node(self, cloud_node_id):
334 for record_dict in [self.cloud_nodes, self.booted]:
335 if cloud_node_id in record_dict:
336 record = record_dict[cloud_node_id]
340 if not record.actor.in_state('idle', 'busy').get():
341 self._begin_node_shutdown(record.actor, cancellable=False)
343 def node_finished_shutdown(self, shutdown_actor):
344 success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
346 shutdown_actor.stop()
347 cloud_node_id = cloud_node.id
349 del self.shutdowns[cloud_node_id]
350 elif cloud_node_id in self.booted:
351 self.booted.pop(cloud_node_id).actor.stop()
352 del self.shutdowns[cloud_node_id]
355 self._logger.info("Shutting down after signal.")
356 self.poll_stale_after = -1 # Inhibit starting/stopping nodes
357 setup_stops = {key: node.stop_if_no_cloud_node()
358 for key, node in self.booting.iteritems()}
359 self.booting = {key: self.booting[key]
360 for key in setup_stops if not setup_stops[key].get()}
361 self._later.await_shutdown()
363 def await_shutdown(self):
365 self._timer.schedule(time.time() + 1, self._later.await_shutdown)