4389: Merge branch 'master' into 4389-breadcrumbs-infinite-loop
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .config import actor_class
13
14 class _ComputeNodeRecord(object):
15     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16                  assignment_time=float('-inf')):
17         self.actor = actor
18         self.cloud_node = cloud_node
19         self.arvados_node = arvados_node
20         self.assignment_time = assignment_time
21
22
23 class _BaseNodeTracker(object):
24     def __init__(self):
25         self.nodes = {}
26         self.orphans = {}
27
28     def __getitem__(self, key):
29         return self.nodes[key]
30
31     def __len__(self):
32         return len(self.nodes)
33
34     def get(self, key, default=None):
35         return self.nodes.get(key, default)
36
37     def record_key(self, record):
38         return self.item_key(getattr(record, self.RECORD_ATTR))
39
40     def add(self, record):
41         self.nodes[self.record_key(record)] = record
42
43     def update_record(self, key, item):
44         setattr(self.nodes[key], self.RECORD_ATTR, item)
45
46     def update_from(self, response):
47         unseen = set(self.nodes.iterkeys())
48         for item in response:
49             key = self.item_key(item)
50             if key in unseen:
51                 unseen.remove(key)
52                 self.update_record(key, item)
53             else:
54                 yield key, item
55         self.orphans = {key: self.nodes.pop(key) for key in unseen}
56
57     def unpaired(self):
58         return (record for record in self.nodes.itervalues()
59                 if getattr(record, self.PAIR_ATTR) is None)
60
61
62 class _CloudNodeTracker(_BaseNodeTracker):
63     RECORD_ATTR = 'cloud_node'
64     PAIR_ATTR = 'arvados_node'
65     item_key = staticmethod(lambda cloud_node: cloud_node.id)
66
67
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69     RECORD_ATTR = 'arvados_node'
70     PAIR_ATTR = 'cloud_node'
71     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
72
73     def find_stale_node(self, stale_time):
74         for record in self.nodes.itervalues():
75             node = record.arvados_node
76             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
77                                           stale_time) and
78                   not cnode.timestamp_fresh(record.assignment_time,
79                                             stale_time)):
80                 return node
81         return None
82
83
84 class NodeManagerDaemonActor(actor_class):
85     """Node Manager daemon.
86
87     This actor subscribes to all information polls about cloud nodes,
88     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
89     for every cloud node, subscribing them to poll updates
90     appropriately.  It creates and destroys cloud nodes based on job queue
91     demand, and stops the corresponding ComputeNode actors when their work
92     is done.
93     """
94     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95                  cloud_nodes_actor, cloud_update_actor, timer_actor,
96                  arvados_factory, cloud_factory,
97                  shutdown_windows, max_nodes,
98                  poll_stale_after=600, node_stale_after=7200,
99                  node_setup_class=cnode.ComputeNodeSetupActor,
100                  node_shutdown_class=cnode.ComputeNodeShutdownActor,
101                  node_actor_class=cnode.ComputeNodeMonitorActor):
102         super(NodeManagerDaemonActor, self).__init__()
103         self._node_setup = node_setup_class
104         self._node_shutdown = node_shutdown_class
105         self._node_actor = node_actor_class
106         self._cloud_updater = cloud_update_actor
107         self._timer = timer_actor
108         self._new_arvados = arvados_factory
109         self._new_cloud = cloud_factory
110         self._cloud_driver = self._new_cloud()
111         self._logger = logging.getLogger('arvnodeman.daemon')
112         self._later = self.actor_ref.proxy()
113         self.shutdown_windows = shutdown_windows
114         self.max_nodes = max_nodes
115         self.poll_stale_after = poll_stale_after
116         self.node_stale_after = node_stale_after
117         self.last_polls = {}
118         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
119             poll_actor = locals()[poll_name + '_actor']
120             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
121             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
122             self.last_polls[poll_name] = -self.poll_stale_after
123         self.cloud_nodes = _CloudNodeTracker()
124         self.arvados_nodes = _ArvadosNodeTracker()
125         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
126         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
127         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
128         self._logger.debug("Daemon initialized")
129
130     def _update_poll_time(self, poll_key):
131         self.last_polls[poll_key] = time.time()
132
133     def _pair_nodes(self, node_record, arvados_node):
134         self._logger.info("Cloud node %s has associated with Arvados node %s",
135                           node_record.cloud_node.id, arvados_node['uuid'])
136         self._arvados_nodes_actor.subscribe_to(
137             arvados_node['uuid'], node_record.actor.update_arvados_node)
138         node_record.arvados_node = arvados_node
139         self.arvados_nodes.add(node_record)
140
141     def _new_node(self, cloud_node):
142         start_time = self._cloud_driver.node_start_time(cloud_node)
143         shutdown_timer = cnode.ShutdownTimer(start_time,
144                                              self.shutdown_windows)
145         actor = self._node_actor.start(
146             cloud_node=cloud_node,
147             cloud_node_start_time=start_time,
148             shutdown_timer=shutdown_timer,
149             update_actor=self._cloud_updater,
150             timer_actor=self._timer,
151             arvados_node=None,
152             poll_stale_after=self.poll_stale_after,
153             node_stale_after=self.node_stale_after).proxy()
154         actor.subscribe(self._later.node_can_shutdown)
155         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
156                                              actor.update_cloud_node)
157         record = _ComputeNodeRecord(actor, cloud_node)
158         return record
159
160     def update_cloud_nodes(self, nodelist):
161         self._update_poll_time('cloud_nodes')
162         for key, node in self.cloud_nodes.update_from(nodelist):
163             self._logger.info("Registering new cloud node %s", key)
164             if key in self.booted:
165                 record = self.booted.pop(key)
166             else:
167                 record = self._new_node(node)
168             self.cloud_nodes.add(record)
169             for arv_rec in self.arvados_nodes.unpaired():
170                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
171                     self._pair_nodes(record, arv_rec.arvados_node)
172                     break
173         for key, record in self.cloud_nodes.orphans.iteritems():
174             record.actor.stop()
175             self.shutdowns.pop(key, None)
176
177     def update_arvados_nodes(self, nodelist):
178         self._update_poll_time('arvados_nodes')
179         for key, node in self.arvados_nodes.update_from(nodelist):
180             self._logger.info("Registering new Arvados node %s", key)
181             record = _ComputeNodeRecord(arvados_node=node)
182             self.arvados_nodes.add(record)
183         for arv_rec in self.arvados_nodes.unpaired():
184             arv_node = arv_rec.arvados_node
185             for cloud_rec in self.cloud_nodes.unpaired():
186                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
187                     self._pair_nodes(cloud_rec, arv_node)
188                     break
189
190     def _nodes_up(self):
191         up = sum(len(nodelist) for nodelist in
192                  [self.cloud_nodes, self.booted, self.booting])
193         return up - len(self.shutdowns)
194
195     def _nodes_busy(self):
196         return sum(1 for idle in
197                    pykka.get_all(rec.actor.in_state('idle') for rec in
198                                  self.cloud_nodes.nodes.itervalues())
199                    if idle is False)
200
201     def _nodes_wanted(self):
202         return min(len(self.last_wishlist) + self._nodes_busy(),
203                    self.max_nodes) - self._nodes_up()
204
205     def _nodes_excess(self):
206         return self._nodes_up() - self._nodes_busy() - len(self.last_wishlist)
207
208     def update_server_wishlist(self, wishlist):
209         self._update_poll_time('server_wishlist')
210         self.last_wishlist = wishlist
211         nodes_wanted = self._nodes_wanted()
212         if nodes_wanted > 0:
213             self._later.start_node()
214         elif (nodes_wanted < 0) and self.booting:
215             self._later.stop_booting_node()
216
217     def _check_poll_freshness(orig_func):
218         """Decorator to inhibit a method when poll information is stale.
219
220         This decorator checks the timestamps of all the poll information the
221         daemon has received.  The decorated method is only called if none
222         of the timestamps are considered stale.
223         """
224         @functools.wraps(orig_func)
225         def wrapper(self, *args, **kwargs):
226             now = time.time()
227             if all(now - t < self.poll_stale_after
228                    for t in self.last_polls.itervalues()):
229                 return orig_func(self, *args, **kwargs)
230             else:
231                 return None
232         return wrapper
233
234     @_check_poll_freshness
235     def start_node(self):
236         nodes_wanted = self._nodes_wanted()
237         if nodes_wanted < 1:
238             return None
239         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
240         cloud_size = self.last_wishlist[nodes_wanted - 1]
241         self._logger.info("Want %s more nodes.  Booting a %s node.",
242                           nodes_wanted, cloud_size.name)
243         new_setup = self._node_setup.start(
244             timer_actor=self._timer,
245             arvados_client=self._new_arvados(),
246             arvados_node=arvados_node,
247             cloud_client=self._new_cloud(),
248             cloud_size=cloud_size).proxy()
249         self.booting[new_setup.actor_ref.actor_urn] = new_setup
250         if arvados_node is not None:
251             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
252                 time.time())
253         new_setup.subscribe(self._later.node_up)
254         if nodes_wanted > 1:
255             self._later.start_node()
256
257     def _actor_nodes(self, node_actor):
258         return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
259
260     def node_up(self, setup_proxy):
261         cloud_node, arvados_node = self._actor_nodes(setup_proxy)
262         del self.booting[setup_proxy.actor_ref.actor_urn]
263         setup_proxy.stop()
264         record = self.cloud_nodes.get(cloud_node.id)
265         if record is None:
266             record = self._new_node(cloud_node)
267             self.booted[cloud_node.id] = record
268         self._pair_nodes(record, arvados_node)
269
270     @_check_poll_freshness
271     def stop_booting_node(self):
272         nodes_excess = self._nodes_excess()
273         if (nodes_excess < 1) or not self.booting:
274             return None
275         for key, node in self.booting.iteritems():
276             node.stop_if_no_cloud_node().get()
277             if not node.actor_ref.is_alive():
278                 del self.booting[key]
279                 if nodes_excess > 1:
280                     self._later.stop_booting_node()
281                 break
282
283     @_check_poll_freshness
284     def node_can_shutdown(self, node_actor):
285         if self._nodes_excess() < 1:
286             return None
287         cloud_node, arvados_node = self._actor_nodes(node_actor)
288         if cloud_node.id in self.shutdowns:
289             return None
290         shutdown = self._node_shutdown.start(timer_actor=self._timer,
291                                              cloud_client=self._new_cloud(),
292                                              cloud_node=cloud_node).proxy()
293         self.shutdowns[cloud_node.id] = shutdown
294         shutdown.subscribe(self._later.node_finished_shutdown)
295
296     def node_finished_shutdown(self, shutdown_actor):
297         cloud_node_id = shutdown_actor.cloud_node.get().id
298         shutdown_actor.stop()
299         if cloud_node_id in self.booted:
300             self.booted.pop(cloud_node_id).actor.stop()
301             del self.shutdowns[cloud_node_id]
302
303     def shutdown(self):
304         self._logger.info("Shutting down after signal.")
305         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
306         for bootnode in self.booting.itervalues():
307             bootnode.stop_if_no_cloud_node()
308         self._later.await_shutdown()
309
310     def await_shutdown(self):
311         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
312             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
313         else:
314             self.stop()