4523: preparing to merge master
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     # Proxy the methods listed below to self.nodes.
30     def _proxy_method(name):
31         method = getattr(dict, name)
32         @functools.wraps(method, ('__name__', '__doc__'))
33         def wrapper(self, *args, **kwargs):
34             return method(self.nodes, *args, **kwargs)
35         return wrapper
36
37     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38         locals()[_method_name] = _proxy_method(_method_name)
39
40     def record_key(self, record):
41         return self.item_key(getattr(record, self.RECORD_ATTR))
42
43     def add(self, record):
44         self.nodes[self.record_key(record)] = record
45
46     def update_record(self, key, item):
47         setattr(self.nodes[key], self.RECORD_ATTR, item)
48
49     def update_from(self, response):
50         unseen = set(self.nodes.iterkeys())
51         for item in response:
52             key = self.item_key(item)
53             if key in unseen:
54                 unseen.remove(key)
55                 self.update_record(key, item)
56             else:
57                 yield key, item
58         self.orphans = {key: self.nodes.pop(key) for key in unseen}
59
60     def unpaired(self):
61         return (record for record in self.nodes.itervalues()
62                 if getattr(record, self.PAIR_ATTR) is None)
63
64
65 class _CloudNodeTracker(_BaseNodeTracker):
66     RECORD_ATTR = 'cloud_node'
67     PAIR_ATTR = 'arvados_node'
68     item_key = staticmethod(lambda cloud_node: cloud_node.id)
69
70
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'arvados_node'
73     PAIR_ATTR = 'cloud_node'
74     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
75
76     def find_stale_node(self, stale_time):
77         for record in self.nodes.itervalues():
78             node = record.arvados_node
79             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
80                                           stale_time) and
81                   not cnode.timestamp_fresh(record.assignment_time,
82                                             stale_time)):
83                 return node
84         return None
85
86
87 class NodeManagerDaemonActor(actor_class):
88     """Node Manager daemon.
89
90     This actor subscribes to all information polls about cloud nodes,
91     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
92     for every cloud node, subscribing them to poll updates
93     appropriately.  It creates and destroys cloud nodes based on job queue
94     demand, and stops the corresponding ComputeNode actors when their work
95     is done.
96     """
97     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98                  cloud_nodes_actor, cloud_update_actor, timer_actor,
99                  arvados_factory, cloud_factory,
100                  shutdown_windows, min_size, min_nodes, max_nodes,
101                  poll_stale_after=600,
102                  boot_fail_after=1800,
103                  node_stale_after=7200,
104                  node_setup_class=dispatch.ComputeNodeSetupActor,
105                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106                  node_actor_class=dispatch.ComputeNodeMonitorActor):
107         super(NodeManagerDaemonActor, self).__init__()
108         self._node_setup = node_setup_class
109         self._node_shutdown = node_shutdown_class
110         self._node_actor = node_actor_class
111         self._cloud_updater = cloud_update_actor
112         self._timer = timer_actor
113         self._new_arvados = arvados_factory
114         self._new_cloud = cloud_factory
115         self._cloud_driver = self._new_cloud()
116         self._logger = logging.getLogger('arvnodeman.daemon')
117         self._later = self.actor_ref.proxy()
118         self.shutdown_windows = shutdown_windows
119         self.min_cloud_size = min_size
120         self.min_nodes = min_nodes
121         self.max_nodes = max_nodes
122         self.poll_stale_after = poll_stale_after
123         self.boot_fail_after = boot_fail_after
124         self.node_stale_after = node_stale_after
125         self.last_polls = {}
126         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127             poll_actor = locals()[poll_name + '_actor']
128             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130             self.last_polls[poll_name] = -self.poll_stale_after
131         self.cloud_nodes = _CloudNodeTracker()
132         self.arvados_nodes = _ArvadosNodeTracker()
133         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
134         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
135         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
136         self._logger.debug("Daemon initialized")
137
138     def _update_poll_time(self, poll_key):
139         self.last_polls[poll_key] = time.time()
140
141     def _pair_nodes(self, node_record, arvados_node):
142         self._logger.info("Cloud node %s has associated with Arvados node %s",
143                           node_record.cloud_node.id, arvados_node['uuid'])
144         self._arvados_nodes_actor.subscribe_to(
145             arvados_node['uuid'], node_record.actor.update_arvados_node)
146         node_record.arvados_node = arvados_node
147         self.arvados_nodes.add(node_record)
148
149     def _new_node(self, cloud_node):
150         start_time = self._cloud_driver.node_start_time(cloud_node)
151         shutdown_timer = cnode.ShutdownTimer(start_time,
152                                              self.shutdown_windows)
153         actor = self._node_actor.start(
154             cloud_node=cloud_node,
155             cloud_node_start_time=start_time,
156             shutdown_timer=shutdown_timer,
157             update_actor=self._cloud_updater,
158             timer_actor=self._timer,
159             arvados_node=None,
160             poll_stale_after=self.poll_stale_after,
161             node_stale_after=self.node_stale_after).proxy()
162         actor.subscribe(self._later.node_can_shutdown)
163         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
164                                              actor.update_cloud_node)
165         record = _ComputeNodeRecord(actor, cloud_node)
166         return record
167
168     def update_cloud_nodes(self, nodelist):
169         self._update_poll_time('cloud_nodes')
170         for key, node in self.cloud_nodes.update_from(nodelist):
171             self._logger.info("Registering new cloud node %s", key)
172             if key in self.booted:
173                 record = self.booted.pop(key)
174             else:
175                 record = self._new_node(node)
176             self.cloud_nodes.add(record)
177             for arv_rec in self.arvados_nodes.unpaired():
178                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
179                     self._pair_nodes(record, arv_rec.arvados_node)
180                     break
181         for key, record in self.cloud_nodes.orphans.iteritems():
182             record.actor.stop()
183             record.cloud_node = None
184             self.shutdowns.pop(key, None)
185
186     def update_arvados_nodes(self, nodelist):
187         self._update_poll_time('arvados_nodes')
188         for key, node in self.arvados_nodes.update_from(nodelist):
189             self._logger.info("Registering new Arvados node %s", key)
190             record = _ComputeNodeRecord(arvados_node=node)
191             self.arvados_nodes.add(record)
192         for arv_rec in self.arvados_nodes.unpaired():
193             arv_node = arv_rec.arvados_node
194             for cloud_rec in self.cloud_nodes.unpaired():
195                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
196                     self._pair_nodes(cloud_rec, arv_node)
197                     break
198
199     def _nodes_up(self):
200         return sum(len(nodelist) for nodelist in
201                    [self.cloud_nodes, self.booted, self.booting])
202
203     def _nodes_busy(self):
204         return sum(1 for idle in
205                    pykka.get_all(rec.actor.in_state('idle') for rec in
206                                  self.cloud_nodes.nodes.itervalues())
207                    if idle is False)
208
209     def _nodes_wanted(self):
210         up_count = self._nodes_up()
211         under_min = self.min_nodes - up_count
212         over_max = up_count - self.max_nodes
213         if over_max >= 0:
214             return -over_max
215         elif under_min > 0:
216             return under_min
217         else:
218             up_count -= len(self.shutdowns) + self._nodes_busy()
219             return len(self.last_wishlist) - up_count
220
221     def _nodes_excess(self):
222         up_count = self._nodes_up() - len(self.shutdowns)
223         over_min = up_count - self.min_nodes
224         if over_min <= 0:
225             return over_min
226         else:
227             return up_count - self._nodes_busy() - len(self.last_wishlist)
228
229     def update_server_wishlist(self, wishlist):
230         self._update_poll_time('server_wishlist')
231         self.last_wishlist = wishlist
232         nodes_wanted = self._nodes_wanted()
233         if nodes_wanted > 0:
234             self._later.start_node()
235         elif (nodes_wanted < 0) and self.booting:
236             self._later.stop_booting_node()
237
238     def _check_poll_freshness(orig_func):
239         """Decorator to inhibit a method when poll information is stale.
240
241         This decorator checks the timestamps of all the poll information the
242         daemon has received.  The decorated method is only called if none
243         of the timestamps are considered stale.
244         """
245         @functools.wraps(orig_func)
246         def wrapper(self, *args, **kwargs):
247             now = time.time()
248             if all(now - t < self.poll_stale_after
249                    for t in self.last_polls.itervalues()):
250                 return orig_func(self, *args, **kwargs)
251             else:
252                 return None
253         return wrapper
254
255     @_check_poll_freshness
256     def start_node(self):
257         nodes_wanted = self._nodes_wanted()
258         if nodes_wanted < 1:
259             return None
260         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
261         try:
262             cloud_size = self.last_wishlist[self._nodes_up()]
263         except IndexError:
264             cloud_size = self.min_cloud_size
265         self._logger.info("Want %s more nodes.  Booting a %s node.",
266                           nodes_wanted, cloud_size.name)
267         new_setup = self._node_setup.start(
268             timer_actor=self._timer,
269             arvados_client=self._new_arvados(),
270             arvados_node=arvados_node,
271             cloud_client=self._new_cloud(),
272             cloud_size=cloud_size).proxy()
273         self.booting[new_setup.actor_ref.actor_urn] = new_setup
274         if arvados_node is not None:
275             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
276                 time.time())
277         new_setup.subscribe(self._later.node_up)
278         if nodes_wanted > 1:
279             self._later.start_node()
280
281     def _get_actor_attrs(self, actor, *attr_names):
282         return pykka.get_all([getattr(actor, name) for name in attr_names])
283
284     def node_up(self, setup_proxy):
285         cloud_node = setup_proxy.cloud_node.get()
286         del self.booting[setup_proxy.actor_ref.actor_urn]
287         setup_proxy.stop()
288         record = self.cloud_nodes.get(cloud_node.id)
289         if record is None:
290             record = self._new_node(cloud_node)
291             self.booted[cloud_node.id] = record
292         self._timer.schedule(time.time() + self.boot_fail_after,
293                              self._later.shutdown_unpaired_node, cloud_node.id)
294
295     @_check_poll_freshness
296     def stop_booting_node(self):
297         nodes_excess = self._nodes_excess()
298         if (nodes_excess < 1) or not self.booting:
299             return None
300         for key, node in self.booting.iteritems():
301             node.stop_if_no_cloud_node().get()
302             if not node.actor_ref.is_alive():
303                 del self.booting[key]
304                 if nodes_excess > 1:
305                     self._later.stop_booting_node()
306                 break
307
308     def _begin_node_shutdown(self, node_actor, cancellable):
309         cloud_node_id = node_actor.cloud_node.get().id
310         if cloud_node_id in self.shutdowns:
311             return None
312         shutdown = self._node_shutdown.start(
313             timer_actor=self._timer, cloud_client=self._new_cloud(),
314             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
315         self.shutdowns[cloud_node_id] = shutdown
316         shutdown.subscribe(self._later.node_finished_shutdown)
317
318     @_check_poll_freshness
319     def node_can_shutdown(self, node_actor):
320         if self._nodes_excess() > 0:
321             self._begin_node_shutdown(node_actor, cancellable=True)
322
323     def shutdown_unpaired_node(self, cloud_node_id):
324         for record_dict in [self.cloud_nodes, self.booted]:
325             if cloud_node_id in record_dict:
326                 record = record_dict[cloud_node_id]
327                 break
328         else:
329             return None
330         if record.arvados_node is None:
331             self._begin_node_shutdown(record.actor, cancellable=False)
332
333     def node_finished_shutdown(self, shutdown_actor):
334         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
335                                                     'cloud_node')
336         shutdown_actor.stop()
337         cloud_node_id = cloud_node.id
338         if not success:
339             del self.shutdowns[cloud_node_id]
340         elif cloud_node_id in self.booted:
341             self.booted.pop(cloud_node_id).actor.stop()
342             del self.shutdowns[cloud_node_id]
343
344     def shutdown(self):
345         self._logger.info("Shutting down after signal.")
346         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
347         for bootnode in self.booting.itervalues():
348             bootnode.stop_if_no_cloud_node()
349         self._later.await_shutdown()
350
351     def await_shutdown(self):
352         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
353             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
354         else:
355             self.stop()