Merge branch '4380-node-manager-computenode-reorg-wip'
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     def __getitem__(self, key):
30         return self.nodes[key]
31
32     def __len__(self):
33         return len(self.nodes)
34
35     def get(self, key, default=None):
36         return self.nodes.get(key, default)
37
38     def record_key(self, record):
39         return self.item_key(getattr(record, self.RECORD_ATTR))
40
41     def add(self, record):
42         self.nodes[self.record_key(record)] = record
43
44     def update_record(self, key, item):
45         setattr(self.nodes[key], self.RECORD_ATTR, item)
46
47     def update_from(self, response):
48         unseen = set(self.nodes.iterkeys())
49         for item in response:
50             key = self.item_key(item)
51             if key in unseen:
52                 unseen.remove(key)
53                 self.update_record(key, item)
54             else:
55                 yield key, item
56         self.orphans = {key: self.nodes.pop(key) for key in unseen}
57
58     def unpaired(self):
59         return (record for record in self.nodes.itervalues()
60                 if getattr(record, self.PAIR_ATTR) is None)
61
62
63 class _CloudNodeTracker(_BaseNodeTracker):
64     RECORD_ATTR = 'cloud_node'
65     PAIR_ATTR = 'arvados_node'
66     item_key = staticmethod(lambda cloud_node: cloud_node.id)
67
68
69 class _ArvadosNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'arvados_node'
71     PAIR_ATTR = 'cloud_node'
72     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
73
74     def find_stale_node(self, stale_time):
75         for record in self.nodes.itervalues():
76             node = record.arvados_node
77             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
78                                           stale_time) and
79                   not cnode.timestamp_fresh(record.assignment_time,
80                                             stale_time)):
81                 return node
82         return None
83
84
85 class NodeManagerDaemonActor(actor_class):
86     """Node Manager daemon.
87
88     This actor subscribes to all information polls about cloud nodes,
89     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
90     for every cloud node, subscribing them to poll updates
91     appropriately.  It creates and destroys cloud nodes based on job queue
92     demand, and stops the corresponding ComputeNode actors when their work
93     is done.
94     """
95     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
96                  cloud_nodes_actor, cloud_update_actor, timer_actor,
97                  arvados_factory, cloud_factory,
98                  shutdown_windows, min_nodes, max_nodes,
99                  poll_stale_after=600, node_stale_after=7200,
100                  node_setup_class=dispatch.ComputeNodeSetupActor,
101                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
102                  node_actor_class=dispatch.ComputeNodeMonitorActor):
103         super(NodeManagerDaemonActor, self).__init__()
104         self._node_setup = node_setup_class
105         self._node_shutdown = node_shutdown_class
106         self._node_actor = node_actor_class
107         self._cloud_updater = cloud_update_actor
108         self._timer = timer_actor
109         self._new_arvados = arvados_factory
110         self._new_cloud = cloud_factory
111         self._cloud_driver = self._new_cloud()
112         self._logger = logging.getLogger('arvnodeman.daemon')
113         self._later = self.actor_ref.proxy()
114         self.shutdown_windows = shutdown_windows
115         self.min_nodes = min_nodes
116         self.max_nodes = max_nodes
117         self.poll_stale_after = poll_stale_after
118         self.node_stale_after = node_stale_after
119         self.last_polls = {}
120         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
121             poll_actor = locals()[poll_name + '_actor']
122             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
123             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
124             self.last_polls[poll_name] = -self.poll_stale_after
125         self.cloud_nodes = _CloudNodeTracker()
126         self.arvados_nodes = _ArvadosNodeTracker()
127         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
128         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
129         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
130         self._logger.debug("Daemon initialized")
131
132     def _update_poll_time(self, poll_key):
133         self.last_polls[poll_key] = time.time()
134
135     def _pair_nodes(self, node_record, arvados_node):
136         self._logger.info("Cloud node %s has associated with Arvados node %s",
137                           node_record.cloud_node.id, arvados_node['uuid'])
138         self._arvados_nodes_actor.subscribe_to(
139             arvados_node['uuid'], node_record.actor.update_arvados_node)
140         node_record.arvados_node = arvados_node
141         self.arvados_nodes.add(node_record)
142
143     def _new_node(self, cloud_node):
144         start_time = self._cloud_driver.node_start_time(cloud_node)
145         shutdown_timer = cnode.ShutdownTimer(start_time,
146                                              self.shutdown_windows)
147         actor = self._node_actor.start(
148             cloud_node=cloud_node,
149             cloud_node_start_time=start_time,
150             shutdown_timer=shutdown_timer,
151             update_actor=self._cloud_updater,
152             timer_actor=self._timer,
153             arvados_node=None,
154             poll_stale_after=self.poll_stale_after,
155             node_stale_after=self.node_stale_after).proxy()
156         actor.subscribe(self._later.node_can_shutdown)
157         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
158                                              actor.update_cloud_node)
159         record = _ComputeNodeRecord(actor, cloud_node)
160         return record
161
162     def update_cloud_nodes(self, nodelist):
163         self._update_poll_time('cloud_nodes')
164         for key, node in self.cloud_nodes.update_from(nodelist):
165             self._logger.info("Registering new cloud node %s", key)
166             if key in self.booted:
167                 record = self.booted.pop(key)
168             else:
169                 record = self._new_node(node)
170             self.cloud_nodes.add(record)
171             for arv_rec in self.arvados_nodes.unpaired():
172                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
173                     self._pair_nodes(record, arv_rec.arvados_node)
174                     break
175         for key, record in self.cloud_nodes.orphans.iteritems():
176             record.actor.stop()
177             self.shutdowns.pop(key, None)
178
179     def update_arvados_nodes(self, nodelist):
180         self._update_poll_time('arvados_nodes')
181         for key, node in self.arvados_nodes.update_from(nodelist):
182             self._logger.info("Registering new Arvados node %s", key)
183             record = _ComputeNodeRecord(arvados_node=node)
184             self.arvados_nodes.add(record)
185         for arv_rec in self.arvados_nodes.unpaired():
186             arv_node = arv_rec.arvados_node
187             for cloud_rec in self.cloud_nodes.unpaired():
188                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
189                     self._pair_nodes(cloud_rec, arv_node)
190                     break
191
192     def _nodes_up(self):
193         up = sum(len(nodelist) for nodelist in
194                  [self.cloud_nodes, self.booted, self.booting])
195         return up - len(self.shutdowns)
196
197     def _nodes_busy(self):
198         return sum(1 for idle in
199                    pykka.get_all(rec.actor.in_state('idle') for rec in
200                                  self.cloud_nodes.nodes.itervalues())
201                    if idle is False)
202
203     def _nodes_wanted(self):
204         return min(len(self.last_wishlist) + self._nodes_busy(),
205                    self.max_nodes) - self._nodes_up()
206
207     def _nodes_excess(self):
208         needed_nodes = self._nodes_busy() + len(self.last_wishlist)
209         return (self._nodes_up() - max(self.min_nodes, needed_nodes))
210
211     def update_server_wishlist(self, wishlist):
212         self._update_poll_time('server_wishlist')
213         self.last_wishlist = wishlist
214         nodes_wanted = self._nodes_wanted()
215         if nodes_wanted > 0:
216             self._later.start_node()
217         elif (nodes_wanted < 0) and self.booting:
218             self._later.stop_booting_node()
219
220     def _check_poll_freshness(orig_func):
221         """Decorator to inhibit a method when poll information is stale.
222
223         This decorator checks the timestamps of all the poll information the
224         daemon has received.  The decorated method is only called if none
225         of the timestamps are considered stale.
226         """
227         @functools.wraps(orig_func)
228         def wrapper(self, *args, **kwargs):
229             now = time.time()
230             if all(now - t < self.poll_stale_after
231                    for t in self.last_polls.itervalues()):
232                 return orig_func(self, *args, **kwargs)
233             else:
234                 return None
235         return wrapper
236
237     @_check_poll_freshness
238     def start_node(self):
239         nodes_wanted = self._nodes_wanted()
240         if nodes_wanted < 1:
241             return None
242         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
243         cloud_size = self.last_wishlist[nodes_wanted - 1]
244         self._logger.info("Want %s more nodes.  Booting a %s node.",
245                           nodes_wanted, cloud_size.name)
246         new_setup = self._node_setup.start(
247             timer_actor=self._timer,
248             arvados_client=self._new_arvados(),
249             arvados_node=arvados_node,
250             cloud_client=self._new_cloud(),
251             cloud_size=cloud_size).proxy()
252         self.booting[new_setup.actor_ref.actor_urn] = new_setup
253         if arvados_node is not None:
254             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
255                 time.time())
256         new_setup.subscribe(self._later.node_up)
257         if nodes_wanted > 1:
258             self._later.start_node()
259
260     def _actor_nodes(self, node_actor):
261         return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
262
263     def node_up(self, setup_proxy):
264         cloud_node, arvados_node = self._actor_nodes(setup_proxy)
265         del self.booting[setup_proxy.actor_ref.actor_urn]
266         setup_proxy.stop()
267         record = self.cloud_nodes.get(cloud_node.id)
268         if record is None:
269             record = self._new_node(cloud_node)
270             self.booted[cloud_node.id] = record
271         self._pair_nodes(record, arvados_node)
272
273     @_check_poll_freshness
274     def stop_booting_node(self):
275         nodes_excess = self._nodes_excess()
276         if (nodes_excess < 1) or not self.booting:
277             return None
278         for key, node in self.booting.iteritems():
279             node.stop_if_no_cloud_node().get()
280             if not node.actor_ref.is_alive():
281                 del self.booting[key]
282                 if nodes_excess > 1:
283                     self._later.stop_booting_node()
284                 break
285
286     @_check_poll_freshness
287     def node_can_shutdown(self, node_actor):
288         if self._nodes_excess() < 1:
289             return None
290         cloud_node, arvados_node = self._actor_nodes(node_actor)
291         if cloud_node.id in self.shutdowns:
292             return None
293         shutdown = self._node_shutdown.start(timer_actor=self._timer,
294                                              cloud_client=self._new_cloud(),
295                                              cloud_node=cloud_node).proxy()
296         self.shutdowns[cloud_node.id] = shutdown
297         shutdown.subscribe(self._later.node_finished_shutdown)
298
299     def node_finished_shutdown(self, shutdown_actor):
300         cloud_node_id = shutdown_actor.cloud_node.get().id
301         shutdown_actor.stop()
302         if cloud_node_id in self.booted:
303             self.booted.pop(cloud_node_id).actor.stop()
304             del self.shutdowns[cloud_node_id]
305
306     def shutdown(self):
307         self._logger.info("Shutting down after signal.")
308         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
309         for bootnode in self.booting.itervalues():
310             bootnode.stop_if_no_cloud_node()
311         self._later.await_shutdown()
312
313     def await_shutdown(self):
314         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
315             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
316         else:
317             self.stop()