Merge branch 'master' into 7167-keep-rsync-test-setup
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     # Proxy the methods listed below to self.nodes.
30     def _proxy_method(name):
31         method = getattr(dict, name)
32         @functools.wraps(method, ('__name__', '__doc__'))
33         def wrapper(self, *args, **kwargs):
34             return method(self.nodes, *args, **kwargs)
35         return wrapper
36
37     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38         locals()[_method_name] = _proxy_method(_method_name)
39
40     def record_key(self, record):
41         return self.item_key(getattr(record, self.RECORD_ATTR))
42
43     def add(self, record):
44         self.nodes[self.record_key(record)] = record
45
46     def update_record(self, key, item):
47         setattr(self.nodes[key], self.RECORD_ATTR, item)
48
49     def update_from(self, response):
50         unseen = set(self.nodes.iterkeys())
51         for item in response:
52             key = self.item_key(item)
53             if key in unseen:
54                 unseen.remove(key)
55                 self.update_record(key, item)
56             else:
57                 yield key, item
58         self.orphans = {key: self.nodes.pop(key) for key in unseen}
59
60     def unpaired(self):
61         return (record for record in self.nodes.itervalues()
62                 if getattr(record, self.PAIR_ATTR) is None)
63
64
65 class _CloudNodeTracker(_BaseNodeTracker):
66     RECORD_ATTR = 'cloud_node'
67     PAIR_ATTR = 'arvados_node'
68     item_key = staticmethod(lambda cloud_node: cloud_node.id)
69
70
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'arvados_node'
73     PAIR_ATTR = 'cloud_node'
74     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
75
76     def find_stale_node(self, stale_time):
77         for record in self.nodes.itervalues():
78             node = record.arvados_node
79             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
80                                           stale_time) and
81                   not cnode.timestamp_fresh(record.assignment_time,
82                                             stale_time)):
83                 return node
84         return None
85
86
87 class NodeManagerDaemonActor(actor_class):
88     """Node Manager daemon.
89
90     This actor subscribes to all information polls about cloud nodes,
91     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
92     for every cloud node, subscribing them to poll updates
93     appropriately.  It creates and destroys cloud nodes based on job queue
94     demand, and stops the corresponding ComputeNode actors when their work
95     is done.
96     """
97     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98                  cloud_nodes_actor, cloud_update_actor, timer_actor,
99                  arvados_factory, cloud_factory,
100                  shutdown_windows, min_size, min_nodes, max_nodes,
101                  poll_stale_after=600,
102                  boot_fail_after=1800,
103                  node_stale_after=7200,
104                  node_setup_class=dispatch.ComputeNodeSetupActor,
105                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106                  node_actor_class=dispatch.ComputeNodeMonitorActor):
107         super(NodeManagerDaemonActor, self).__init__()
108         self._node_setup = node_setup_class
109         self._node_shutdown = node_shutdown_class
110         self._node_actor = node_actor_class
111         self._cloud_updater = cloud_update_actor
112         self._timer = timer_actor
113         self._new_arvados = arvados_factory
114         self._new_cloud = cloud_factory
115         self._cloud_driver = self._new_cloud()
116         self._logger = logging.getLogger('arvnodeman.daemon')
117         self._later = self.actor_ref.proxy()
118         self.shutdown_windows = shutdown_windows
119         self.min_cloud_size = min_size
120         self.min_nodes = min_nodes
121         self.max_nodes = max_nodes
122         self.poll_stale_after = poll_stale_after
123         self.boot_fail_after = boot_fail_after
124         self.node_stale_after = node_stale_after
125         self.last_polls = {}
126         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127             poll_actor = locals()[poll_name + '_actor']
128             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130             self.last_polls[poll_name] = -self.poll_stale_after
131         self.cloud_nodes = _CloudNodeTracker()
132         self.arvados_nodes = _ArvadosNodeTracker()
133         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
134         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
135         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
136         self._logger.debug("Daemon initialized")
137
138     def _update_poll_time(self, poll_key):
139         self.last_polls[poll_key] = time.time()
140
141     def _pair_nodes(self, node_record, arvados_node):
142         self._logger.info("Cloud node %s has associated with Arvados node %s",
143                           node_record.cloud_node.id, arvados_node['uuid'])
144         self._arvados_nodes_actor.subscribe_to(
145             arvados_node['uuid'], node_record.actor.update_arvados_node)
146         node_record.arvados_node = arvados_node
147         self.arvados_nodes.add(node_record)
148
149     def _new_node(self, cloud_node):
150         start_time = self._cloud_driver.node_start_time(cloud_node)
151         shutdown_timer = cnode.ShutdownTimer(start_time,
152                                              self.shutdown_windows)
153         actor = self._node_actor.start(
154             cloud_node=cloud_node,
155             cloud_node_start_time=start_time,
156             shutdown_timer=shutdown_timer,
157             cloud_fqdn_func=self._cloud_driver.node_fqdn,
158             update_actor=self._cloud_updater,
159             timer_actor=self._timer,
160             arvados_node=None,
161             poll_stale_after=self.poll_stale_after,
162             node_stale_after=self.node_stale_after,
163             cloud_client=self._cloud_driver,
164             boot_fail_after=self.boot_fail_after).proxy()
165         actor.subscribe(self._later.node_can_shutdown)
166         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
167                                              actor.update_cloud_node)
168         record = _ComputeNodeRecord(actor, cloud_node)
169         return record
170
171     def update_cloud_nodes(self, nodelist):
172         self._update_poll_time('cloud_nodes')
173         for key, node in self.cloud_nodes.update_from(nodelist):
174             self._logger.info("Registering new cloud node %s", key)
175             if key in self.booted:
176                 record = self.booted.pop(key)
177             else:
178                 record = self._new_node(node)
179             self.cloud_nodes.add(record)
180             for arv_rec in self.arvados_nodes.unpaired():
181                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
182                     self._pair_nodes(record, arv_rec.arvados_node)
183                     break
184         for key, record in self.cloud_nodes.orphans.iteritems():
185             if key in self.shutdowns:
186                 try:
187                     self.shutdowns[key].stop().get()
188                 except pykka.ActorDeadError:
189                     pass
190                 del self.shutdowns[key]
191             record.actor.stop()
192             record.cloud_node = None
193
194     def update_arvados_nodes(self, nodelist):
195         self._update_poll_time('arvados_nodes')
196         for key, node in self.arvados_nodes.update_from(nodelist):
197             self._logger.info("Registering new Arvados node %s", key)
198             record = _ComputeNodeRecord(arvados_node=node)
199             self.arvados_nodes.add(record)
200         for arv_rec in self.arvados_nodes.unpaired():
201             arv_node = arv_rec.arvados_node
202             for cloud_rec in self.cloud_nodes.unpaired():
203                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
204                     self._pair_nodes(cloud_rec, arv_node)
205                     break
206
207     def _nodes_up(self):
208         return sum(len(nodelist) for nodelist in
209                    [self.cloud_nodes, self.booted, self.booting])
210
211     def _nodes_busy(self):
212         return sum(1 for busy in
213                    pykka.get_all(rec.actor.in_state('busy') for rec in
214                                  self.cloud_nodes.nodes.itervalues())
215                    if busy)
216
217     def _nodes_missing(self):
218         return sum(1 for arv_node in
219                    pykka.get_all(rec.actor.arvados_node for rec in
220                                  self.cloud_nodes.nodes.itervalues()
221                                  if rec.actor.cloud_node.get().id not in self.shutdowns)
222                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
223
224     def _nodes_wanted(self):
225         up_count = self._nodes_up()
226         under_min = self.min_nodes - up_count
227         over_max = up_count - self.max_nodes
228         if over_max >= 0:
229             return -over_max
230         elif under_min > 0:
231             return under_min
232         else:
233             up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
234             return len(self.last_wishlist) - up_count
235
236     def _nodes_excess(self):
237         up_count = self._nodes_up() - len(self.shutdowns)
238         over_min = up_count - self.min_nodes
239         if over_min <= 0:
240             return over_min
241         else:
242             return up_count - self._nodes_busy() - len(self.last_wishlist)
243
244     def update_server_wishlist(self, wishlist):
245         self._update_poll_time('server_wishlist')
246         self.last_wishlist = wishlist
247         nodes_wanted = self._nodes_wanted()
248         if nodes_wanted > 0:
249             self._later.start_node()
250         elif (nodes_wanted < 0) and self.booting:
251             self._later.stop_booting_node()
252
253     def _check_poll_freshness(orig_func):
254         """Decorator to inhibit a method when poll information is stale.
255
256         This decorator checks the timestamps of all the poll information the
257         daemon has received.  The decorated method is only called if none
258         of the timestamps are considered stale.
259         """
260         @functools.wraps(orig_func)
261         def wrapper(self, *args, **kwargs):
262             now = time.time()
263             if all(now - t < self.poll_stale_after
264                    for t in self.last_polls.itervalues()):
265                 return orig_func(self, *args, **kwargs)
266             else:
267                 return None
268         return wrapper
269
270     @_check_poll_freshness
271     def start_node(self):
272         nodes_wanted = self._nodes_wanted()
273         if nodes_wanted < 1:
274             return None
275         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
276         try:
277             cloud_size = self.last_wishlist[self._nodes_up()]
278         except IndexError:
279             cloud_size = self.min_cloud_size
280         self._logger.info("Want %s more nodes.  Booting a %s node.",
281                           nodes_wanted, cloud_size.name)
282         new_setup = self._node_setup.start(
283             timer_actor=self._timer,
284             arvados_client=self._new_arvados(),
285             arvados_node=arvados_node,
286             cloud_client=self._new_cloud(),
287             cloud_size=cloud_size).proxy()
288         self.booting[new_setup.actor_ref.actor_urn] = new_setup
289         if arvados_node is not None:
290             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
291                 time.time())
292         new_setup.subscribe(self._later.node_up)
293         if nodes_wanted > 1:
294             self._later.start_node()
295
296     def _get_actor_attrs(self, actor, *attr_names):
297         return pykka.get_all([getattr(actor, name) for name in attr_names])
298
299     def node_up(self, setup_proxy):
300         cloud_node = setup_proxy.cloud_node.get()
301         del self.booting[setup_proxy.actor_ref.actor_urn]
302         setup_proxy.stop()
303         record = self.cloud_nodes.get(cloud_node.id)
304         if record is None:
305             record = self._new_node(cloud_node)
306             self.booted[cloud_node.id] = record
307         self._timer.schedule(time.time() + self.boot_fail_after,
308                              self._later.shutdown_unpaired_node, cloud_node.id)
309
310     @_check_poll_freshness
311     def stop_booting_node(self):
312         nodes_excess = self._nodes_excess()
313         if (nodes_excess < 1) or not self.booting:
314             return None
315         for key, node in self.booting.iteritems():
316             if node.stop_if_no_cloud_node().get():
317                 del self.booting[key]
318                 if nodes_excess > 1:
319                     self._later.stop_booting_node()
320                 break
321
322     def _begin_node_shutdown(self, node_actor, cancellable):
323         cloud_node_id = node_actor.cloud_node.get().id
324         if cloud_node_id in self.shutdowns:
325             return None
326         shutdown = self._node_shutdown.start(
327             timer_actor=self._timer, cloud_client=self._new_cloud(),
328             arvados_client=self._new_arvados(),
329             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
330         self.shutdowns[cloud_node_id] = shutdown
331         shutdown.subscribe(self._later.node_finished_shutdown)
332
333     @_check_poll_freshness
334     def node_can_shutdown(self, node_actor):
335         if self._nodes_excess() > 0:
336             self._begin_node_shutdown(node_actor, cancellable=True)
337
338     def shutdown_unpaired_node(self, cloud_node_id):
339         for record_dict in [self.cloud_nodes, self.booted]:
340             if cloud_node_id in record_dict:
341                 record = record_dict[cloud_node_id]
342                 break
343         else:
344             return None
345         if not record.actor.in_state('idle', 'busy').get():
346             self._begin_node_shutdown(record.actor, cancellable=False)
347
348     def node_finished_shutdown(self, shutdown_actor):
349         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
350                                                     'cloud_node')
351         shutdown_actor.stop()
352         cloud_node_id = cloud_node.id
353         if not success:
354             del self.shutdowns[cloud_node_id]
355         elif cloud_node_id in self.booted:
356             self.booted.pop(cloud_node_id).actor.stop()
357             del self.shutdowns[cloud_node_id]
358
359     def shutdown(self):
360         self._logger.info("Shutting down after signal.")
361         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
362         setup_stops = {key: node.stop_if_no_cloud_node()
363                        for key, node in self.booting.iteritems()}
364         self.booting = {key: self.booting[key]
365                         for key in setup_stops if not setup_stops[key].get()}
366         self._later.await_shutdown()
367
368     def await_shutdown(self):
369         if self.booting:
370             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
371         else:
372             self.stop()