3824: Merge branch 'master' into 3824-docker-fixes
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     def __getitem__(self, key):
30         return self.nodes[key]
31
32     def __len__(self):
33         return len(self.nodes)
34
35     def get(self, key, default=None):
36         return self.nodes.get(key, default)
37
38     def record_key(self, record):
39         return self.item_key(getattr(record, self.RECORD_ATTR))
40
41     def add(self, record):
42         self.nodes[self.record_key(record)] = record
43
44     def update_record(self, key, item):
45         setattr(self.nodes[key], self.RECORD_ATTR, item)
46
47     def update_from(self, response):
48         unseen = set(self.nodes.iterkeys())
49         for item in response:
50             key = self.item_key(item)
51             if key in unseen:
52                 unseen.remove(key)
53                 self.update_record(key, item)
54             else:
55                 yield key, item
56         self.orphans = {key: self.nodes.pop(key) for key in unseen}
57
58     def unpaired(self):
59         return (record for record in self.nodes.itervalues()
60                 if getattr(record, self.PAIR_ATTR) is None)
61
62
63 class _CloudNodeTracker(_BaseNodeTracker):
64     RECORD_ATTR = 'cloud_node'
65     PAIR_ATTR = 'arvados_node'
66     item_key = staticmethod(lambda cloud_node: cloud_node.id)
67
68
69 class _ArvadosNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'arvados_node'
71     PAIR_ATTR = 'cloud_node'
72     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
73
74     def find_stale_node(self, stale_time):
75         for record in self.nodes.itervalues():
76             node = record.arvados_node
77             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
78                                           stale_time) and
79                   not cnode.timestamp_fresh(record.assignment_time,
80                                             stale_time)):
81                 return node
82         return None
83
84
85 class NodeManagerDaemonActor(actor_class):
86     """Node Manager daemon.
87
88     This actor subscribes to all information polls about cloud nodes,
89     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
90     for every cloud node, subscribing them to poll updates
91     appropriately.  It creates and destroys cloud nodes based on job queue
92     demand, and stops the corresponding ComputeNode actors when their work
93     is done.
94     """
95     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
96                  cloud_nodes_actor, cloud_update_actor, timer_actor,
97                  arvados_factory, cloud_factory,
98                  shutdown_windows, min_nodes, max_nodes,
99                  poll_stale_after=600, node_stale_after=7200,
100                  node_setup_class=dispatch.ComputeNodeSetupActor,
101                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
102                  node_actor_class=dispatch.ComputeNodeMonitorActor):
103         super(NodeManagerDaemonActor, self).__init__()
104         self._node_setup = node_setup_class
105         self._node_shutdown = node_shutdown_class
106         self._node_actor = node_actor_class
107         self._cloud_updater = cloud_update_actor
108         self._timer = timer_actor
109         self._new_arvados = arvados_factory
110         self._new_cloud = cloud_factory
111         self._cloud_driver = self._new_cloud()
112         self._logger = logging.getLogger('arvnodeman.daemon')
113         self._later = self.actor_ref.proxy()
114         self.shutdown_windows = shutdown_windows
115         self.min_nodes = min_nodes
116         self.max_nodes = max_nodes
117         self.poll_stale_after = poll_stale_after
118         self.node_stale_after = node_stale_after
119         self.last_polls = {}
120         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
121             poll_actor = locals()[poll_name + '_actor']
122             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
123             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
124             self.last_polls[poll_name] = -self.poll_stale_after
125         self.cloud_nodes = _CloudNodeTracker()
126         self.arvados_nodes = _ArvadosNodeTracker()
127         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
128         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
129         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
130         self._logger.debug("Daemon initialized")
131
132     def _update_poll_time(self, poll_key):
133         self.last_polls[poll_key] = time.time()
134
135     def _pair_nodes(self, node_record, arvados_node):
136         self._logger.info("Cloud node %s has associated with Arvados node %s",
137                           node_record.cloud_node.id, arvados_node['uuid'])
138         self._arvados_nodes_actor.subscribe_to(
139             arvados_node['uuid'], node_record.actor.update_arvados_node)
140         node_record.arvados_node = arvados_node
141         self.arvados_nodes.add(node_record)
142
143     def _new_node(self, cloud_node):
144         start_time = self._cloud_driver.node_start_time(cloud_node)
145         shutdown_timer = cnode.ShutdownTimer(start_time,
146                                              self.shutdown_windows)
147         actor = self._node_actor.start(
148             cloud_node=cloud_node,
149             cloud_node_start_time=start_time,
150             shutdown_timer=shutdown_timer,
151             update_actor=self._cloud_updater,
152             timer_actor=self._timer,
153             arvados_node=None,
154             poll_stale_after=self.poll_stale_after,
155             node_stale_after=self.node_stale_after).proxy()
156         actor.subscribe(self._later.node_can_shutdown)
157         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
158                                              actor.update_cloud_node)
159         record = _ComputeNodeRecord(actor, cloud_node)
160         return record
161
162     def update_cloud_nodes(self, nodelist):
163         self._update_poll_time('cloud_nodes')
164         for key, node in self.cloud_nodes.update_from(nodelist):
165             self._logger.info("Registering new cloud node %s", key)
166             if key in self.booted:
167                 record = self.booted.pop(key)
168             else:
169                 record = self._new_node(node)
170             self.cloud_nodes.add(record)
171             for arv_rec in self.arvados_nodes.unpaired():
172                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
173                     self._pair_nodes(record, arv_rec.arvados_node)
174                     break
175         for key, record in self.cloud_nodes.orphans.iteritems():
176             record.actor.stop()
177             self.shutdowns.pop(key, None)
178
179     def update_arvados_nodes(self, nodelist):
180         self._update_poll_time('arvados_nodes')
181         for key, node in self.arvados_nodes.update_from(nodelist):
182             self._logger.info("Registering new Arvados node %s", key)
183             record = _ComputeNodeRecord(arvados_node=node)
184             self.arvados_nodes.add(record)
185         for arv_rec in self.arvados_nodes.unpaired():
186             arv_node = arv_rec.arvados_node
187             for cloud_rec in self.cloud_nodes.unpaired():
188                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
189                     self._pair_nodes(cloud_rec, arv_node)
190                     break
191
192     def _nodes_up(self):
193         return sum(len(nodelist) for nodelist in
194                    [self.cloud_nodes, self.booted, self.booting])
195
196     def _nodes_busy(self):
197         return sum(1 for idle in
198                    pykka.get_all(rec.actor.in_state('idle') for rec in
199                                  self.cloud_nodes.nodes.itervalues())
200                    if idle is False)
201
202     def _nodes_wanted(self):
203         up_count = self._nodes_up()
204         over_max = up_count - self.max_nodes
205         if over_max >= 0:
206             return -over_max
207         else:
208             up_count -= len(self.shutdowns) + self._nodes_busy()
209             return len(self.last_wishlist) - up_count
210
211     def _nodes_excess(self):
212         up_count = self._nodes_up() - len(self.shutdowns)
213         over_min = up_count - self.min_nodes
214         if over_min <= 0:
215             return over_min
216         else:
217             return up_count - self._nodes_busy() - len(self.last_wishlist)
218
219     def update_server_wishlist(self, wishlist):
220         self._update_poll_time('server_wishlist')
221         self.last_wishlist = wishlist
222         nodes_wanted = self._nodes_wanted()
223         if nodes_wanted > 0:
224             self._later.start_node()
225         elif (nodes_wanted < 0) and self.booting:
226             self._later.stop_booting_node()
227
228     def _check_poll_freshness(orig_func):
229         """Decorator to inhibit a method when poll information is stale.
230
231         This decorator checks the timestamps of all the poll information the
232         daemon has received.  The decorated method is only called if none
233         of the timestamps are considered stale.
234         """
235         @functools.wraps(orig_func)
236         def wrapper(self, *args, **kwargs):
237             now = time.time()
238             if all(now - t < self.poll_stale_after
239                    for t in self.last_polls.itervalues()):
240                 return orig_func(self, *args, **kwargs)
241             else:
242                 return None
243         return wrapper
244
245     @_check_poll_freshness
246     def start_node(self):
247         nodes_wanted = self._nodes_wanted()
248         if nodes_wanted < 1:
249             return None
250         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
251         cloud_size = self.last_wishlist[nodes_wanted - 1]
252         self._logger.info("Want %s more nodes.  Booting a %s node.",
253                           nodes_wanted, cloud_size.name)
254         new_setup = self._node_setup.start(
255             timer_actor=self._timer,
256             arvados_client=self._new_arvados(),
257             arvados_node=arvados_node,
258             cloud_client=self._new_cloud(),
259             cloud_size=cloud_size).proxy()
260         self.booting[new_setup.actor_ref.actor_urn] = new_setup
261         if arvados_node is not None:
262             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
263                 time.time())
264         new_setup.subscribe(self._later.node_up)
265         if nodes_wanted > 1:
266             self._later.start_node()
267
268     def _get_actor_attrs(self, actor, *attr_names):
269         return pykka.get_all([getattr(actor, name) for name in attr_names])
270
271     def node_up(self, setup_proxy):
272         cloud_node, arvados_node = self._get_actor_attrs(
273             setup_proxy, 'cloud_node', 'arvados_node')
274         del self.booting[setup_proxy.actor_ref.actor_urn]
275         setup_proxy.stop()
276         record = self.cloud_nodes.get(cloud_node.id)
277         if record is None:
278             record = self._new_node(cloud_node)
279             self.booted[cloud_node.id] = record
280         self._pair_nodes(record, arvados_node)
281
282     @_check_poll_freshness
283     def stop_booting_node(self):
284         nodes_excess = self._nodes_excess()
285         if (nodes_excess < 1) or not self.booting:
286             return None
287         for key, node in self.booting.iteritems():
288             node.stop_if_no_cloud_node().get()
289             if not node.actor_ref.is_alive():
290                 del self.booting[key]
291                 if nodes_excess > 1:
292                     self._later.stop_booting_node()
293                 break
294
295     @_check_poll_freshness
296     def node_can_shutdown(self, node_actor):
297         if self._nodes_excess() < 1:
298             return None
299         cloud_node_id = node_actor.cloud_node.get().id
300         if cloud_node_id in self.shutdowns:
301             return None
302         shutdown = self._node_shutdown.start(
303             timer_actor=self._timer, cloud_client=self._new_cloud(),
304             node_monitor=node_actor.actor_ref).proxy()
305         self.shutdowns[cloud_node_id] = shutdown
306         shutdown.subscribe(self._later.node_finished_shutdown)
307
308     def node_finished_shutdown(self, shutdown_actor):
309         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
310                                                     'cloud_node')
311         shutdown_actor.stop()
312         cloud_node_id = cloud_node.id
313         if not success:
314             del self.shutdowns[cloud_node_id]
315         elif cloud_node_id in self.booted:
316             self.booted.pop(cloud_node_id).actor.stop()
317             del self.shutdowns[cloud_node_id]
318
319     def shutdown(self):
320         self._logger.info("Shutting down after signal.")
321         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
322         for bootnode in self.booting.itervalues():
323             bootnode.stop_if_no_cloud_node()
324         self._later.await_shutdown()
325
326     def await_shutdown(self):
327         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
328             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
329         else:
330             self.stop()