Add docs to Node Manager's base compute node driver.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     # Proxy the methods listed below to self.nodes.
30     def _proxy_method(name):
31         method = getattr(dict, name)
32         @functools.wraps(method, ('__name__', '__doc__'))
33         def wrapper(self, *args, **kwargs):
34             return method(self.nodes, *args, **kwargs)
35         return wrapper
36
37     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38         locals()[_method_name] = _proxy_method(_method_name)
39
40     def record_key(self, record):
41         return self.item_key(getattr(record, self.RECORD_ATTR))
42
43     def add(self, record):
44         self.nodes[self.record_key(record)] = record
45
46     def update_record(self, key, item):
47         setattr(self.nodes[key], self.RECORD_ATTR, item)
48
49     def update_from(self, response):
50         unseen = set(self.nodes.iterkeys())
51         for item in response:
52             key = self.item_key(item)
53             if key in unseen:
54                 unseen.remove(key)
55                 self.update_record(key, item)
56             else:
57                 yield key, item
58         self.orphans = {key: self.nodes.pop(key) for key in unseen}
59
60     def unpaired(self):
61         return (record for record in self.nodes.itervalues()
62                 if getattr(record, self.PAIR_ATTR) is None)
63
64
65 class _CloudNodeTracker(_BaseNodeTracker):
66     RECORD_ATTR = 'cloud_node'
67     PAIR_ATTR = 'arvados_node'
68     item_key = staticmethod(lambda cloud_node: cloud_node.id)
69
70
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'arvados_node'
73     PAIR_ATTR = 'cloud_node'
74     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
75
76     def find_stale_node(self, stale_time):
77         for record in self.nodes.itervalues():
78             node = record.arvados_node
79             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
80                                           stale_time) and
81                   not cnode.timestamp_fresh(record.assignment_time,
82                                             stale_time)):
83                 return node
84         return None
85
86
87 class NodeManagerDaemonActor(actor_class):
88     """Node Manager daemon.
89
90     This actor subscribes to all information polls about cloud nodes,
91     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
92     for every cloud node, subscribing them to poll updates
93     appropriately.  It creates and destroys cloud nodes based on job queue
94     demand, and stops the corresponding ComputeNode actors when their work
95     is done.
96     """
97     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98                  cloud_nodes_actor, cloud_update_actor, timer_actor,
99                  arvados_factory, cloud_factory,
100                  shutdown_windows, min_size, min_nodes, max_nodes,
101                  poll_stale_after=600,
102                  boot_fail_after=1800,
103                  node_stale_after=7200,
104                  node_setup_class=dispatch.ComputeNodeSetupActor,
105                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106                  node_actor_class=dispatch.ComputeNodeMonitorActor):
107         super(NodeManagerDaemonActor, self).__init__()
108         self._node_setup = node_setup_class
109         self._node_shutdown = node_shutdown_class
110         self._node_actor = node_actor_class
111         self._cloud_updater = cloud_update_actor
112         self._timer = timer_actor
113         self._new_arvados = arvados_factory
114         self._new_cloud = cloud_factory
115         self._cloud_driver = self._new_cloud()
116         self._logger = logging.getLogger('arvnodeman.daemon')
117         self._later = self.actor_ref.proxy()
118         self.shutdown_windows = shutdown_windows
119         self.min_cloud_size = min_size
120         self.min_nodes = min_nodes
121         self.max_nodes = max_nodes
122         self.poll_stale_after = poll_stale_after
123         self.boot_fail_after = boot_fail_after
124         self.node_stale_after = node_stale_after
125         self.last_polls = {}
126         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127             poll_actor = locals()[poll_name + '_actor']
128             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130             self.last_polls[poll_name] = -self.poll_stale_after
131         self.cloud_nodes = _CloudNodeTracker()
132         self.arvados_nodes = _ArvadosNodeTracker()
133         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
134         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
135         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
136         self._logger.debug("Daemon initialized")
137
138     def _update_poll_time(self, poll_key):
139         self.last_polls[poll_key] = time.time()
140
141     def _pair_nodes(self, node_record, arvados_node):
142         self._logger.info("Cloud node %s has associated with Arvados node %s",
143                           node_record.cloud_node.id, arvados_node['uuid'])
144         self._arvados_nodes_actor.subscribe_to(
145             arvados_node['uuid'], node_record.actor.update_arvados_node)
146         node_record.arvados_node = arvados_node
147         self.arvados_nodes.add(node_record)
148
149     def _new_node(self, cloud_node):
150         start_time = self._cloud_driver.node_start_time(cloud_node)
151         shutdown_timer = cnode.ShutdownTimer(start_time,
152                                              self.shutdown_windows)
153         actor = self._node_actor.start(
154             cloud_node=cloud_node,
155             cloud_node_start_time=start_time,
156             shutdown_timer=shutdown_timer,
157             cloud_fqdn_func=self._cloud_driver.node_fqdn,
158             update_actor=self._cloud_updater,
159             timer_actor=self._timer,
160             arvados_node=None,
161             poll_stale_after=self.poll_stale_after,
162             node_stale_after=self.node_stale_after).proxy()
163         actor.subscribe(self._later.node_can_shutdown)
164         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
165                                              actor.update_cloud_node)
166         record = _ComputeNodeRecord(actor, cloud_node)
167         return record
168
169     def update_cloud_nodes(self, nodelist):
170         self._update_poll_time('cloud_nodes')
171         for key, node in self.cloud_nodes.update_from(nodelist):
172             self._logger.info("Registering new cloud node %s", key)
173             if key in self.booted:
174                 record = self.booted.pop(key)
175             else:
176                 record = self._new_node(node)
177             self.cloud_nodes.add(record)
178             for arv_rec in self.arvados_nodes.unpaired():
179                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
180                     self._pair_nodes(record, arv_rec.arvados_node)
181                     break
182         for key, record in self.cloud_nodes.orphans.iteritems():
183             record.actor.stop()
184             record.cloud_node = None
185             self.shutdowns.pop(key, None)
186
187     def update_arvados_nodes(self, nodelist):
188         self._update_poll_time('arvados_nodes')
189         for key, node in self.arvados_nodes.update_from(nodelist):
190             self._logger.info("Registering new Arvados node %s", key)
191             record = _ComputeNodeRecord(arvados_node=node)
192             self.arvados_nodes.add(record)
193         for arv_rec in self.arvados_nodes.unpaired():
194             arv_node = arv_rec.arvados_node
195             for cloud_rec in self.cloud_nodes.unpaired():
196                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
197                     self._pair_nodes(cloud_rec, arv_node)
198                     break
199
200     def _nodes_up(self):
201         return sum(len(nodelist) for nodelist in
202                    [self.cloud_nodes, self.booted, self.booting])
203
204     def _nodes_busy(self):
205         return sum(1 for busy in
206                    pykka.get_all(rec.actor.in_state('busy') for rec in
207                                  self.cloud_nodes.nodes.itervalues())
208                    if busy)
209
210     def _nodes_wanted(self):
211         up_count = self._nodes_up()
212         under_min = self.min_nodes - up_count
213         over_max = up_count - self.max_nodes
214         if over_max >= 0:
215             return -over_max
216         elif under_min > 0:
217             return under_min
218         else:
219             up_count -= len(self.shutdowns) + self._nodes_busy()
220             return len(self.last_wishlist) - up_count
221
222     def _nodes_excess(self):
223         up_count = self._nodes_up() - len(self.shutdowns)
224         over_min = up_count - self.min_nodes
225         if over_min <= 0:
226             return over_min
227         else:
228             return up_count - self._nodes_busy() - len(self.last_wishlist)
229
230     def update_server_wishlist(self, wishlist):
231         self._update_poll_time('server_wishlist')
232         self.last_wishlist = wishlist
233         nodes_wanted = self._nodes_wanted()
234         if nodes_wanted > 0:
235             self._later.start_node()
236         elif (nodes_wanted < 0) and self.booting:
237             self._later.stop_booting_node()
238
239     def _check_poll_freshness(orig_func):
240         """Decorator to inhibit a method when poll information is stale.
241
242         This decorator checks the timestamps of all the poll information the
243         daemon has received.  The decorated method is only called if none
244         of the timestamps are considered stale.
245         """
246         @functools.wraps(orig_func)
247         def wrapper(self, *args, **kwargs):
248             now = time.time()
249             if all(now - t < self.poll_stale_after
250                    for t in self.last_polls.itervalues()):
251                 return orig_func(self, *args, **kwargs)
252             else:
253                 return None
254         return wrapper
255
256     @_check_poll_freshness
257     def start_node(self):
258         nodes_wanted = self._nodes_wanted()
259         if nodes_wanted < 1:
260             return None
261         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
262         try:
263             cloud_size = self.last_wishlist[self._nodes_up()]
264         except IndexError:
265             cloud_size = self.min_cloud_size
266         self._logger.info("Want %s more nodes.  Booting a %s node.",
267                           nodes_wanted, cloud_size.name)
268         new_setup = self._node_setup.start(
269             timer_actor=self._timer,
270             arvados_client=self._new_arvados(),
271             arvados_node=arvados_node,
272             cloud_client=self._new_cloud(),
273             cloud_size=cloud_size).proxy()
274         self.booting[new_setup.actor_ref.actor_urn] = new_setup
275         if arvados_node is not None:
276             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
277                 time.time())
278         new_setup.subscribe(self._later.node_up)
279         if nodes_wanted > 1:
280             self._later.start_node()
281
282     def _get_actor_attrs(self, actor, *attr_names):
283         return pykka.get_all([getattr(actor, name) for name in attr_names])
284
285     def node_up(self, setup_proxy):
286         cloud_node = setup_proxy.cloud_node.get()
287         del self.booting[setup_proxy.actor_ref.actor_urn]
288         setup_proxy.stop()
289         record = self.cloud_nodes.get(cloud_node.id)
290         if record is None:
291             record = self._new_node(cloud_node)
292             self.booted[cloud_node.id] = record
293         self._timer.schedule(time.time() + self.boot_fail_after,
294                              self._later.shutdown_unpaired_node, cloud_node.id)
295
296     @_check_poll_freshness
297     def stop_booting_node(self):
298         nodes_excess = self._nodes_excess()
299         if (nodes_excess < 1) or not self.booting:
300             return None
301         for key, node in self.booting.iteritems():
302             if node.stop_if_no_cloud_node().get():
303                 del self.booting[key]
304                 if nodes_excess > 1:
305                     self._later.stop_booting_node()
306                 break
307
308     def _begin_node_shutdown(self, node_actor, cancellable):
309         cloud_node_id = node_actor.cloud_node.get().id
310         if cloud_node_id in self.shutdowns:
311             return None
312         shutdown = self._node_shutdown.start(
313             timer_actor=self._timer, cloud_client=self._new_cloud(),
314             arvados_client=self._new_arvados(),
315             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
316         self.shutdowns[cloud_node_id] = shutdown
317         shutdown.subscribe(self._later.node_finished_shutdown)
318
319     @_check_poll_freshness
320     def node_can_shutdown(self, node_actor):
321         if self._nodes_excess() > 0:
322             self._begin_node_shutdown(node_actor, cancellable=True)
323
324     def shutdown_unpaired_node(self, cloud_node_id):
325         for record_dict in [self.cloud_nodes, self.booted]:
326             if cloud_node_id in record_dict:
327                 record = record_dict[cloud_node_id]
328                 break
329         else:
330             return None
331         if not record.actor.in_state('idle', 'busy').get():
332             self._begin_node_shutdown(record.actor, cancellable=False)
333
334     def node_finished_shutdown(self, shutdown_actor):
335         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
336                                                     'cloud_node')
337         shutdown_actor.stop()
338         cloud_node_id = cloud_node.id
339         if not success:
340             del self.shutdowns[cloud_node_id]
341         elif cloud_node_id in self.booted:
342             self.booted.pop(cloud_node_id).actor.stop()
343             del self.shutdowns[cloud_node_id]
344
345     def shutdown(self):
346         self._logger.info("Shutting down after signal.")
347         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
348         setup_stops = {key: node.stop_if_no_cloud_node()
349                        for key, node in self.booting.iteritems()}
350         self.booting = {key: self.booting[key]
351                         for key in setup_stops if not setup_stops[key].get()}
352         self._later.await_shutdown()
353
354     def await_shutdown(self):
355         if self.booting:
356             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
357         else:
358             self.stop()