2881: Add Node Manager service.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .config import actor_class
13
14 class _ComputeNodeRecord(object):
15     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16                  assignment_time=float('-inf')):
17         self.actor = actor
18         self.cloud_node = cloud_node
19         self.arvados_node = arvados_node
20         self.assignment_time = assignment_time
21
22
23 class _BaseNodeTracker(object):
24     def __init__(self):
25         self.nodes = {}
26         self.orphans = {}
27
28     def __getitem__(self, key):
29         return self.nodes[key]
30
31     def __len__(self):
32         return len(self.nodes)
33
34     def get(self, key, default=None):
35         return self.nodes.get(key, default)
36
37     def record_key(self, record):
38         return self.item_key(getattr(record, self.RECORD_ATTR))
39
40     def add(self, record):
41         self.nodes[self.record_key(record)] = record
42
43     def update_record(self, key, item):
44         setattr(self.nodes[key], self.RECORD_ATTR, item)
45
46     def update_from(self, response):
47         unseen = set(self.nodes.iterkeys())
48         for item in response:
49             key = self.item_key(item)
50             if key in unseen:
51                 unseen.remove(key)
52                 self.update_record(key, item)
53             else:
54                 yield key, item
55         self.orphans = {key: self.nodes.pop(key) for key in unseen}
56
57     def unpaired(self):
58         return (record for record in self.nodes.itervalues()
59                 if getattr(record, self.PAIR_ATTR) is None)
60
61
62 class _CloudNodeTracker(_BaseNodeTracker):
63     RECORD_ATTR = 'cloud_node'
64     PAIR_ATTR = 'arvados_node'
65     item_key = staticmethod(lambda cloud_node: cloud_node.id)
66
67
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69     RECORD_ATTR = 'arvados_node'
70     PAIR_ATTR = 'cloud_node'
71     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
72
73     def find_stale_node(self, stale_time):
74         for record in self.nodes.itervalues():
75             node = record.arvados_node
76             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
77                                           stale_time) and
78                   not cnode.timestamp_fresh(record.assignment_time,
79                                             stale_time)):
80                 return node
81         return None
82
83
84 class NodeManagerDaemonActor(actor_class):
85     """Node Manager daemon.
86
87     This actor subscribes to all information polls about cloud nodes,
88     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
89     for every cloud node, subscribing them to poll updates
90     appropriately.  It creates and destroys cloud nodes based on job queue
91     demand, and stops the corresponding ComputeNode actors when their work
92     is done.
93     """
94     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95                  cloud_nodes_actor, cloud_update_actor, timer_actor,
96                  arvados_factory, cloud_factory,
97                  shutdown_windows, max_nodes,
98                  poll_stale_after=600, node_stale_after=7200,
99                  node_setup_class=cnode.ComputeNodeSetupActor,
100                  node_shutdown_class=cnode.ComputeNodeShutdownActor,
101                  node_actor_class=cnode.ComputeNodeMonitorActor):
102         super(NodeManagerDaemonActor, self).__init__()
103         self._node_setup = node_setup_class
104         self._node_shutdown = node_shutdown_class
105         self._node_actor = node_actor_class
106         self._cloud_updater = cloud_update_actor
107         self._timer = timer_actor
108         self._new_arvados = arvados_factory
109         self._new_cloud = cloud_factory
110         self._cloud_driver = self._new_cloud()
111         self._logger = logging.getLogger('arvnodeman.daemon')
112         self._later = self.actor_ref.proxy()
113         self.shutdown_windows = shutdown_windows
114         self.max_nodes = max_nodes
115         self.poll_stale_after = poll_stale_after
116         self.node_stale_after = node_stale_after
117         self.last_polls = {}
118         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
119             poll_actor = locals()[poll_name + '_actor']
120             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
121             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
122             self.last_polls[poll_name] = -self.poll_stale_after
123         self.cloud_nodes = _CloudNodeTracker()
124         self.arvados_nodes = _ArvadosNodeTracker()
125         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
126         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
127         self._logger.debug("Daemon initialized")
128
129     def _update_poll_time(self, poll_key):
130         self.last_polls[poll_key] = time.time()
131
132     def _pair_nodes(self, node_record, arvados_node):
133         self._logger.info("Cloud node %s has associated with Arvados node %s",
134                           node_record.cloud_node.id, arvados_node['uuid'])
135         self._arvados_nodes_actor.subscribe_to(
136             arvados_node['uuid'], node_record.actor.update_arvados_node)
137         node_record.arvados_node = arvados_node
138         self.arvados_nodes.add(node_record)
139
140     def _new_node(self, cloud_node):
141         start_time = self._cloud_driver.node_start_time(cloud_node)
142         shutdown_timer = cnode.ShutdownTimer(start_time,
143                                              self.shutdown_windows)
144         actor = self._node_actor.start(
145             cloud_node=cloud_node,
146             cloud_node_start_time=start_time,
147             shutdown_timer=shutdown_timer,
148             update_actor=self._cloud_updater,
149             timer_actor=self._timer,
150             arvados_node=None,
151             poll_stale_after=self.poll_stale_after,
152             node_stale_after=self.node_stale_after).proxy()
153         actor.subscribe(self._later.node_can_shutdown)
154         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
155                                              actor.update_cloud_node)
156         record = _ComputeNodeRecord(actor, cloud_node)
157         self.cloud_nodes.add(record)
158         return record
159
160     def update_cloud_nodes(self, nodelist):
161         self._update_poll_time('cloud_nodes')
162         for key, node in self.cloud_nodes.update_from(nodelist):
163             self._logger.info("Registering new cloud node %s", key)
164             record = self._new_node(node)
165             for arv_rec in self.arvados_nodes.unpaired():
166                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
167                     self._pair_nodes(record, arv_rec.arvados_node)
168                     break
169         for key, record in self.cloud_nodes.orphans.iteritems():
170             record.actor.stop()
171             if key in self.shutdowns:
172                 self.shutdowns.pop(key).stop()
173
174     def update_arvados_nodes(self, nodelist):
175         self._update_poll_time('arvados_nodes')
176         for key, node in self.arvados_nodes.update_from(nodelist):
177             self._logger.info("Registering new Arvados node %s", key)
178             record = _ComputeNodeRecord(arvados_node=node)
179             self.arvados_nodes.add(record)
180         for arv_rec in self.arvados_nodes.unpaired():
181             arv_node = arv_rec.arvados_node
182             for cloud_rec in self.cloud_nodes.unpaired():
183                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
184                     self._pair_nodes(cloud_rec, arv_node)
185                     break
186
187     def _node_count(self):
188         up = sum(len(nodelist) for nodelist in [self.cloud_nodes, self.booting])
189         return up - len(self.shutdowns)
190
191     def _nodes_wanted(self):
192         return len(self.last_wishlist) - self._node_count()
193
194     def _nodes_excess(self):
195         return -self._nodes_wanted()
196
197     def update_server_wishlist(self, wishlist):
198         self._update_poll_time('server_wishlist')
199         self.last_wishlist = wishlist[:self.max_nodes]
200         nodes_wanted = self._nodes_wanted()
201         if nodes_wanted > 0:
202             self._later.start_node()
203         elif (nodes_wanted < 0) and self.booting:
204             self._later.stop_booting_node()
205
206     def _check_poll_freshness(orig_func):
207         """Decorator to inhibit a method when poll information is stale.
208
209         This decorator checks the timestamps of all the poll information the
210         daemon has received.  The decorated method is only called if none
211         of the timestamps are considered stale.
212         """
213         @functools.wraps(orig_func)
214         def wrapper(self, *args, **kwargs):
215             now = time.time()
216             if all(now - t < self.poll_stale_after
217                    for t in self.last_polls.itervalues()):
218                 return orig_func(self, *args, **kwargs)
219             else:
220                 return None
221         return wrapper
222
223     @_check_poll_freshness
224     def start_node(self):
225         nodes_wanted = self._nodes_wanted()
226         if nodes_wanted < 1:
227             return None
228         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
229         cloud_size = self.last_wishlist[nodes_wanted - 1]
230         self._logger.info("Want %s more nodes.  Booting a %s node.",
231                           nodes_wanted, cloud_size.name)
232         new_setup = self._node_setup.start(
233             timer_actor=self._timer,
234             arvados_client=self._new_arvados(),
235             arvados_node=arvados_node,
236             cloud_client=self._new_cloud(),
237             cloud_size=cloud_size).proxy()
238         self.booting[new_setup.actor_ref.actor_urn] = new_setup
239         if arvados_node is not None:
240             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
241                 time.time())
242         new_setup.subscribe(self._later.node_up)
243         if nodes_wanted > 1:
244             self._later.start_node()
245
246     def _actor_nodes(self, node_actor):
247         return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
248
249     def node_up(self, setup_proxy):
250         cloud_node, arvados_node = self._actor_nodes(setup_proxy)
251         del self.booting[setup_proxy.actor_ref.actor_urn]
252         setup_proxy.stop()
253         record = self.cloud_nodes.get(cloud_node.id)
254         if record is None:
255             record = self._new_node(cloud_node)
256         self._pair_nodes(record, arvados_node)
257
258     @_check_poll_freshness
259     def stop_booting_node(self):
260         nodes_excess = self._nodes_excess()
261         if (nodes_excess < 1) or not self.booting:
262             return None
263         for key, node in self.booting.iteritems():
264             node.stop_if_no_cloud_node().get()
265             if not node.actor_ref.is_alive():
266                 del self.booting[key]
267                 if nodes_excess > 1:
268                     self._later.stop_booting_node()
269                 break
270
271     @_check_poll_freshness
272     def node_can_shutdown(self, node_actor):
273         if self._nodes_excess() < 1:
274             return None
275         cloud_node, arvados_node = self._actor_nodes(node_actor)
276         if cloud_node.id in self.shutdowns:
277             return None
278         shutdown = self._node_shutdown.start(timer_actor=self._timer,
279                                              cloud_client=self._new_cloud(),
280                                              cloud_node=cloud_node).proxy()
281         self.shutdowns[cloud_node.id] = shutdown
282
283     def shutdown(self):
284         self._logger.info("Shutting down after signal.")
285         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
286         for bootnode in self.booting.itervalues():
287             bootnode.stop_if_no_cloud_node()
288         self._later.await_shutdown()
289
290     def await_shutdown(self):
291         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
292             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
293         else:
294             self.stop()