Merge branch '4434-collation' closes #4434
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .config import actor_class
13
14 class _ComputeNodeRecord(object):
15     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16                  assignment_time=float('-inf')):
17         self.actor = actor
18         self.cloud_node = cloud_node
19         self.arvados_node = arvados_node
20         self.assignment_time = assignment_time
21
22
23 class _BaseNodeTracker(object):
24     def __init__(self):
25         self.nodes = {}
26         self.orphans = {}
27
28     def __getitem__(self, key):
29         return self.nodes[key]
30
31     def __len__(self):
32         return len(self.nodes)
33
34     def get(self, key, default=None):
35         return self.nodes.get(key, default)
36
37     def record_key(self, record):
38         return self.item_key(getattr(record, self.RECORD_ATTR))
39
40     def add(self, record):
41         self.nodes[self.record_key(record)] = record
42
43     def update_record(self, key, item):
44         setattr(self.nodes[key], self.RECORD_ATTR, item)
45
46     def update_from(self, response):
47         unseen = set(self.nodes.iterkeys())
48         for item in response:
49             key = self.item_key(item)
50             if key in unseen:
51                 unseen.remove(key)
52                 self.update_record(key, item)
53             else:
54                 yield key, item
55         self.orphans = {key: self.nodes.pop(key) for key in unseen}
56
57     def unpaired(self):
58         return (record for record in self.nodes.itervalues()
59                 if getattr(record, self.PAIR_ATTR) is None)
60
61
62 class _CloudNodeTracker(_BaseNodeTracker):
63     RECORD_ATTR = 'cloud_node'
64     PAIR_ATTR = 'arvados_node'
65     item_key = staticmethod(lambda cloud_node: cloud_node.id)
66
67
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69     RECORD_ATTR = 'arvados_node'
70     PAIR_ATTR = 'cloud_node'
71     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
72
73     def find_stale_node(self, stale_time):
74         for record in self.nodes.itervalues():
75             node = record.arvados_node
76             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
77                                           stale_time) and
78                   not cnode.timestamp_fresh(record.assignment_time,
79                                             stale_time)):
80                 return node
81         return None
82
83
84 class NodeManagerDaemonActor(actor_class):
85     """Node Manager daemon.
86
87     This actor subscribes to all information polls about cloud nodes,
88     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
89     for every cloud node, subscribing them to poll updates
90     appropriately.  It creates and destroys cloud nodes based on job queue
91     demand, and stops the corresponding ComputeNode actors when their work
92     is done.
93     """
94     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95                  cloud_nodes_actor, cloud_update_actor, timer_actor,
96                  arvados_factory, cloud_factory,
97                  shutdown_windows, min_nodes, max_nodes,
98                  poll_stale_after=600, node_stale_after=7200,
99                  node_setup_class=cnode.ComputeNodeSetupActor,
100                  node_shutdown_class=cnode.ComputeNodeShutdownActor,
101                  node_actor_class=cnode.ComputeNodeMonitorActor):
102         super(NodeManagerDaemonActor, self).__init__()
103         self._node_setup = node_setup_class
104         self._node_shutdown = node_shutdown_class
105         self._node_actor = node_actor_class
106         self._cloud_updater = cloud_update_actor
107         self._timer = timer_actor
108         self._new_arvados = arvados_factory
109         self._new_cloud = cloud_factory
110         self._cloud_driver = self._new_cloud()
111         self._logger = logging.getLogger('arvnodeman.daemon')
112         self._later = self.actor_ref.proxy()
113         self.shutdown_windows = shutdown_windows
114         self.min_nodes = min_nodes
115         self.max_nodes = max_nodes
116         self.poll_stale_after = poll_stale_after
117         self.node_stale_after = node_stale_after
118         self.last_polls = {}
119         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
120             poll_actor = locals()[poll_name + '_actor']
121             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
122             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
123             self.last_polls[poll_name] = -self.poll_stale_after
124         self.cloud_nodes = _CloudNodeTracker()
125         self.arvados_nodes = _ArvadosNodeTracker()
126         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
127         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
128         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
129         self._logger.debug("Daemon initialized")
130
131     def _update_poll_time(self, poll_key):
132         self.last_polls[poll_key] = time.time()
133
134     def _pair_nodes(self, node_record, arvados_node):
135         self._logger.info("Cloud node %s has associated with Arvados node %s",
136                           node_record.cloud_node.id, arvados_node['uuid'])
137         self._arvados_nodes_actor.subscribe_to(
138             arvados_node['uuid'], node_record.actor.update_arvados_node)
139         node_record.arvados_node = arvados_node
140         self.arvados_nodes.add(node_record)
141
142     def _new_node(self, cloud_node):
143         start_time = self._cloud_driver.node_start_time(cloud_node)
144         shutdown_timer = cnode.ShutdownTimer(start_time,
145                                              self.shutdown_windows)
146         actor = self._node_actor.start(
147             cloud_node=cloud_node,
148             cloud_node_start_time=start_time,
149             shutdown_timer=shutdown_timer,
150             update_actor=self._cloud_updater,
151             timer_actor=self._timer,
152             arvados_node=None,
153             poll_stale_after=self.poll_stale_after,
154             node_stale_after=self.node_stale_after).proxy()
155         actor.subscribe(self._later.node_can_shutdown)
156         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
157                                              actor.update_cloud_node)
158         record = _ComputeNodeRecord(actor, cloud_node)
159         return record
160
161     def update_cloud_nodes(self, nodelist):
162         self._update_poll_time('cloud_nodes')
163         for key, node in self.cloud_nodes.update_from(nodelist):
164             self._logger.info("Registering new cloud node %s", key)
165             if key in self.booted:
166                 record = self.booted.pop(key)
167             else:
168                 record = self._new_node(node)
169             self.cloud_nodes.add(record)
170             for arv_rec in self.arvados_nodes.unpaired():
171                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
172                     self._pair_nodes(record, arv_rec.arvados_node)
173                     break
174         for key, record in self.cloud_nodes.orphans.iteritems():
175             record.actor.stop()
176             self.shutdowns.pop(key, None)
177
178     def update_arvados_nodes(self, nodelist):
179         self._update_poll_time('arvados_nodes')
180         for key, node in self.arvados_nodes.update_from(nodelist):
181             self._logger.info("Registering new Arvados node %s", key)
182             record = _ComputeNodeRecord(arvados_node=node)
183             self.arvados_nodes.add(record)
184         for arv_rec in self.arvados_nodes.unpaired():
185             arv_node = arv_rec.arvados_node
186             for cloud_rec in self.cloud_nodes.unpaired():
187                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
188                     self._pair_nodes(cloud_rec, arv_node)
189                     break
190
191     def _nodes_up(self):
192         up = sum(len(nodelist) for nodelist in
193                  [self.cloud_nodes, self.booted, self.booting])
194         return up - len(self.shutdowns)
195
196     def _nodes_busy(self):
197         return sum(1 for idle in
198                    pykka.get_all(rec.actor.in_state('idle') for rec in
199                                  self.cloud_nodes.nodes.itervalues())
200                    if idle is False)
201
202     def _nodes_wanted(self):
203         return min(len(self.last_wishlist) + self._nodes_busy(),
204                    self.max_nodes) - self._nodes_up()
205
206     def _nodes_excess(self):
207         needed_nodes = self._nodes_busy() + len(self.last_wishlist)
208         return (self._nodes_up() - max(self.min_nodes, needed_nodes))
209
210     def update_server_wishlist(self, wishlist):
211         self._update_poll_time('server_wishlist')
212         self.last_wishlist = wishlist
213         nodes_wanted = self._nodes_wanted()
214         if nodes_wanted > 0:
215             self._later.start_node()
216         elif (nodes_wanted < 0) and self.booting:
217             self._later.stop_booting_node()
218
219     def _check_poll_freshness(orig_func):
220         """Decorator to inhibit a method when poll information is stale.
221
222         This decorator checks the timestamps of all the poll information the
223         daemon has received.  The decorated method is only called if none
224         of the timestamps are considered stale.
225         """
226         @functools.wraps(orig_func)
227         def wrapper(self, *args, **kwargs):
228             now = time.time()
229             if all(now - t < self.poll_stale_after
230                    for t in self.last_polls.itervalues()):
231                 return orig_func(self, *args, **kwargs)
232             else:
233                 return None
234         return wrapper
235
236     @_check_poll_freshness
237     def start_node(self):
238         nodes_wanted = self._nodes_wanted()
239         if nodes_wanted < 1:
240             return None
241         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
242         cloud_size = self.last_wishlist[nodes_wanted - 1]
243         self._logger.info("Want %s more nodes.  Booting a %s node.",
244                           nodes_wanted, cloud_size.name)
245         new_setup = self._node_setup.start(
246             timer_actor=self._timer,
247             arvados_client=self._new_arvados(),
248             arvados_node=arvados_node,
249             cloud_client=self._new_cloud(),
250             cloud_size=cloud_size).proxy()
251         self.booting[new_setup.actor_ref.actor_urn] = new_setup
252         if arvados_node is not None:
253             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
254                 time.time())
255         new_setup.subscribe(self._later.node_up)
256         if nodes_wanted > 1:
257             self._later.start_node()
258
259     def _actor_nodes(self, node_actor):
260         return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
261
262     def node_up(self, setup_proxy):
263         cloud_node, arvados_node = self._actor_nodes(setup_proxy)
264         del self.booting[setup_proxy.actor_ref.actor_urn]
265         setup_proxy.stop()
266         record = self.cloud_nodes.get(cloud_node.id)
267         if record is None:
268             record = self._new_node(cloud_node)
269             self.booted[cloud_node.id] = record
270         self._pair_nodes(record, arvados_node)
271
272     @_check_poll_freshness
273     def stop_booting_node(self):
274         nodes_excess = self._nodes_excess()
275         if (nodes_excess < 1) or not self.booting:
276             return None
277         for key, node in self.booting.iteritems():
278             node.stop_if_no_cloud_node().get()
279             if not node.actor_ref.is_alive():
280                 del self.booting[key]
281                 if nodes_excess > 1:
282                     self._later.stop_booting_node()
283                 break
284
285     @_check_poll_freshness
286     def node_can_shutdown(self, node_actor):
287         if self._nodes_excess() < 1:
288             return None
289         cloud_node, arvados_node = self._actor_nodes(node_actor)
290         if cloud_node.id in self.shutdowns:
291             return None
292         shutdown = self._node_shutdown.start(timer_actor=self._timer,
293                                              cloud_client=self._new_cloud(),
294                                              cloud_node=cloud_node).proxy()
295         self.shutdowns[cloud_node.id] = shutdown
296         shutdown.subscribe(self._later.node_finished_shutdown)
297
298     def node_finished_shutdown(self, shutdown_actor):
299         cloud_node_id = shutdown_actor.cloud_node.get().id
300         shutdown_actor.stop()
301         if cloud_node_id in self.booted:
302             self.booted.pop(cloud_node_id).actor.stop()
303             del self.shutdowns[cloud_node_id]
304
305     def shutdown(self):
306         self._logger.info("Shutting down after signal.")
307         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
308         for bootnode in self.booting.itervalues():
309             bootnode.stop_if_no_cloud_node()
310         self._later.await_shutdown()
311
312     def await_shutdown(self):
313         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
314             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
315         else:
316             self.stop()