6142: Only resume from 'drng' or 'drain'. Add/fix tests.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     # Proxy the methods listed below to self.nodes.
30     def _proxy_method(name):
31         method = getattr(dict, name)
32         @functools.wraps(method, ('__name__', '__doc__'))
33         def wrapper(self, *args, **kwargs):
34             return method(self.nodes, *args, **kwargs)
35         return wrapper
36
37     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38         locals()[_method_name] = _proxy_method(_method_name)
39
40     def record_key(self, record):
41         return self.item_key(getattr(record, self.RECORD_ATTR))
42
43     def add(self, record):
44         self.nodes[self.record_key(record)] = record
45
46     def update_record(self, key, item):
47         setattr(self.nodes[key], self.RECORD_ATTR, item)
48
49     def update_from(self, response):
50         unseen = set(self.nodes.iterkeys())
51         for item in response:
52             key = self.item_key(item)
53             if key in unseen:
54                 unseen.remove(key)
55                 self.update_record(key, item)
56             else:
57                 yield key, item
58         self.orphans = {key: self.nodes.pop(key) for key in unseen}
59
60     def unpaired(self):
61         return (record for record in self.nodes.itervalues()
62                 if getattr(record, self.PAIR_ATTR) is None)
63
64
65 class _CloudNodeTracker(_BaseNodeTracker):
66     RECORD_ATTR = 'cloud_node'
67     PAIR_ATTR = 'arvados_node'
68     item_key = staticmethod(lambda cloud_node: cloud_node.id)
69
70
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'arvados_node'
73     PAIR_ATTR = 'cloud_node'
74     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
75
76     def find_stale_node(self, stale_time):
77         for record in self.nodes.itervalues():
78             node = record.arvados_node
79             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
80                                           stale_time) and
81                   not cnode.timestamp_fresh(record.assignment_time,
82                                             stale_time)):
83                 return node
84         return None
85
86
87 class NodeManagerDaemonActor(actor_class):
88     """Node Manager daemon.
89
90     This actor subscribes to all information polls about cloud nodes,
91     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
92     for every cloud node, subscribing them to poll updates
93     appropriately.  It creates and destroys cloud nodes based on job queue
94     demand, and stops the corresponding ComputeNode actors when their work
95     is done.
96     """
97     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98                  cloud_nodes_actor, cloud_update_actor, timer_actor,
99                  arvados_factory, cloud_factory,
100                  shutdown_windows, min_size, min_nodes, max_nodes,
101                  poll_stale_after=600,
102                  boot_fail_after=1800,
103                  node_stale_after=7200,
104                  node_setup_class=dispatch.ComputeNodeSetupActor,
105                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106                  node_actor_class=dispatch.ComputeNodeMonitorActor):
107         super(NodeManagerDaemonActor, self).__init__()
108         self._node_setup = node_setup_class
109         self._node_shutdown = node_shutdown_class
110         self._node_actor = node_actor_class
111         self._cloud_updater = cloud_update_actor
112         self._timer = timer_actor
113         self._new_arvados = arvados_factory
114         self._new_cloud = cloud_factory
115         self._cloud_driver = self._new_cloud()
116         self._logger = logging.getLogger('arvnodeman.daemon')
117         self._later = self.actor_ref.proxy()
118         self.shutdown_windows = shutdown_windows
119         self.min_cloud_size = min_size
120         self.min_nodes = min_nodes
121         self.max_nodes = max_nodes
122         self.poll_stale_after = poll_stale_after
123         self.boot_fail_after = boot_fail_after
124         self.node_stale_after = node_stale_after
125         self.last_polls = {}
126         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
127             poll_actor = locals()[poll_name + '_actor']
128             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
129             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
130             self.last_polls[poll_name] = -self.poll_stale_after
131         self.cloud_nodes = _CloudNodeTracker()
132         self.arvados_nodes = _ArvadosNodeTracker()
133         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
134         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
135         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
136         self._logger.debug("Daemon initialized")
137
138     def _update_poll_time(self, poll_key):
139         self.last_polls[poll_key] = time.time()
140
141     def _pair_nodes(self, node_record, arvados_node):
142         self._logger.info("Cloud node %s has associated with Arvados node %s",
143                           node_record.cloud_node.id, arvados_node['uuid'])
144         self._arvados_nodes_actor.subscribe_to(
145             arvados_node['uuid'], node_record.actor.update_arvados_node)
146         node_record.arvados_node = arvados_node
147         self.arvados_nodes.add(node_record)
148
149     def _new_node(self, cloud_node):
150         start_time = self._cloud_driver.node_start_time(cloud_node)
151         shutdown_timer = cnode.ShutdownTimer(start_time,
152                                              self.shutdown_windows)
153         actor = self._node_actor.start(
154             cloud_node=cloud_node,
155             cloud_node_start_time=start_time,
156             shutdown_timer=shutdown_timer,
157             cloud_fqdn_func=self._cloud_driver.node_fqdn,
158             update_actor=self._cloud_updater,
159             timer_actor=self._timer,
160             arvados_node=None,
161             poll_stale_after=self.poll_stale_after,
162             node_stale_after=self.node_stale_after,
163             cloud_client=self._cloud_driver,
164             boot_fail_after=self.boot_fail_after).proxy()
165         actor.subscribe(self._later.node_can_shutdown)
166         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
167                                              actor.update_cloud_node)
168         record = _ComputeNodeRecord(actor, cloud_node)
169         return record
170
171     def update_cloud_nodes(self, nodelist):
172         self._update_poll_time('cloud_nodes')
173         for key, node in self.cloud_nodes.update_from(nodelist):
174             self._logger.info("Registering new cloud node %s", key)
175             if key in self.booted:
176                 record = self.booted.pop(key)
177             else:
178                 record = self._new_node(node)
179             self.cloud_nodes.add(record)
180             for arv_rec in self.arvados_nodes.unpaired():
181                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
182                     self._pair_nodes(record, arv_rec.arvados_node)
183                     break
184         for key, record in self.cloud_nodes.orphans.iteritems():
185             record.actor.stop()
186             record.cloud_node = None
187             self.shutdowns.pop(key, None)
188
189     def update_arvados_nodes(self, nodelist):
190         self._update_poll_time('arvados_nodes')
191         for key, node in self.arvados_nodes.update_from(nodelist):
192             self._logger.info("Registering new Arvados node %s", key)
193             record = _ComputeNodeRecord(arvados_node=node)
194             self.arvados_nodes.add(record)
195         for arv_rec in self.arvados_nodes.unpaired():
196             arv_node = arv_rec.arvados_node
197             for cloud_rec in self.cloud_nodes.unpaired():
198                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
199                     self._pair_nodes(cloud_rec, arv_node)
200                     break
201
202     def _nodes_up(self):
203         return sum(len(nodelist) for nodelist in
204                    [self.cloud_nodes, self.booted, self.booting])
205
206     def _nodes_busy(self):
207         return sum(1 for busy in
208                    pykka.get_all(rec.actor.in_state('busy') for rec in
209                                  self.cloud_nodes.nodes.itervalues())
210                    if busy)
211
212     def _nodes_missing(self):
213         return sum(1 for arv_node in
214                    pykka.get_all(rec.actor.arvados_node for rec in
215                                  self.cloud_nodes.nodes.itervalues()
216                                  if rec.actor.cloud_node.get().id not in self.shutdowns)
217                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
218
219     def _nodes_wanted(self):
220         up_count = self._nodes_up()
221         under_min = self.min_nodes - up_count
222         over_max = up_count - self.max_nodes
223         if over_max >= 0:
224             return -over_max
225         elif under_min > 0:
226             return under_min
227         else:
228             up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
229             return len(self.last_wishlist) - up_count
230
231     def _nodes_excess(self):
232         up_count = self._nodes_up() - len(self.shutdowns)
233         over_min = up_count - self.min_nodes
234         if over_min <= 0:
235             return over_min
236         else:
237             return up_count - self._nodes_busy() - len(self.last_wishlist)
238
239     def update_server_wishlist(self, wishlist):
240         self._update_poll_time('server_wishlist')
241         self.last_wishlist = wishlist
242         nodes_wanted = self._nodes_wanted()
243         if nodes_wanted > 0:
244             self._later.start_node()
245         elif (nodes_wanted < 0) and self.booting:
246             self._later.stop_booting_node()
247
248     def _check_poll_freshness(orig_func):
249         """Decorator to inhibit a method when poll information is stale.
250
251         This decorator checks the timestamps of all the poll information the
252         daemon has received.  The decorated method is only called if none
253         of the timestamps are considered stale.
254         """
255         @functools.wraps(orig_func)
256         def wrapper(self, *args, **kwargs):
257             now = time.time()
258             if all(now - t < self.poll_stale_after
259                    for t in self.last_polls.itervalues()):
260                 return orig_func(self, *args, **kwargs)
261             else:
262                 return None
263         return wrapper
264
265     @_check_poll_freshness
266     def start_node(self):
267         nodes_wanted = self._nodes_wanted()
268         if nodes_wanted < 1:
269             return None
270         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
271         try:
272             cloud_size = self.last_wishlist[self._nodes_up()]
273         except IndexError:
274             cloud_size = self.min_cloud_size
275         self._logger.info("Want %s more nodes.  Booting a %s node.",
276                           nodes_wanted, cloud_size.name)
277         new_setup = self._node_setup.start(
278             timer_actor=self._timer,
279             arvados_client=self._new_arvados(),
280             arvados_node=arvados_node,
281             cloud_client=self._new_cloud(),
282             cloud_size=cloud_size).proxy()
283         self.booting[new_setup.actor_ref.actor_urn] = new_setup
284         if arvados_node is not None:
285             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
286                 time.time())
287         new_setup.subscribe(self._later.node_up)
288         if nodes_wanted > 1:
289             self._later.start_node()
290
291     def _get_actor_attrs(self, actor, *attr_names):
292         return pykka.get_all([getattr(actor, name) for name in attr_names])
293
294     def node_up(self, setup_proxy):
295         cloud_node = setup_proxy.cloud_node.get()
296         del self.booting[setup_proxy.actor_ref.actor_urn]
297         setup_proxy.stop()
298         record = self.cloud_nodes.get(cloud_node.id)
299         if record is None:
300             record = self._new_node(cloud_node)
301             self.booted[cloud_node.id] = record
302         self._timer.schedule(time.time() + self.boot_fail_after,
303                              self._later.shutdown_unpaired_node, cloud_node.id)
304
305     @_check_poll_freshness
306     def stop_booting_node(self):
307         nodes_excess = self._nodes_excess()
308         if (nodes_excess < 1) or not self.booting:
309             return None
310         for key, node in self.booting.iteritems():
311             if node.stop_if_no_cloud_node().get():
312                 del self.booting[key]
313                 if nodes_excess > 1:
314                     self._later.stop_booting_node()
315                 break
316
317     def _begin_node_shutdown(self, node_actor, cancellable):
318         cloud_node_id = node_actor.cloud_node.get().id
319         if cloud_node_id in self.shutdowns:
320             return None
321         shutdown = self._node_shutdown.start(
322             timer_actor=self._timer, cloud_client=self._new_cloud(),
323             arvados_client=self._new_arvados(),
324             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
325         self.shutdowns[cloud_node_id] = shutdown
326         shutdown.subscribe(self._later.node_finished_shutdown)
327
328     @_check_poll_freshness
329     def node_can_shutdown(self, node_actor):
330         if self._nodes_excess() > 0:
331             self._begin_node_shutdown(node_actor, cancellable=True)
332
333     def shutdown_unpaired_node(self, cloud_node_id):
334         for record_dict in [self.cloud_nodes, self.booted]:
335             if cloud_node_id in record_dict:
336                 record = record_dict[cloud_node_id]
337                 break
338         else:
339             return None
340         if not record.actor.in_state('idle', 'busy').get():
341             self._begin_node_shutdown(record.actor, cancellable=False)
342
343     def node_finished_shutdown(self, shutdown_actor):
344         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
345                                                     'cloud_node')
346         shutdown_actor.stop()
347         cloud_node_id = cloud_node.id
348         if not success:
349             del self.shutdowns[cloud_node_id]
350         elif cloud_node_id in self.booted:
351             self.booted.pop(cloud_node_id).actor.stop()
352             del self.shutdowns[cloud_node_id]
353
354     def shutdown(self):
355         self._logger.info("Shutting down after signal.")
356         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
357         setup_stops = {key: node.stop_if_no_cloud_node()
358                        for key, node in self.booting.iteritems()}
359         self.booting = {key: self.booting[key]
360                         for key in setup_stops if not setup_stops[key].get()}
361         self._later.await_shutdown()
362
363     def await_shutdown(self):
364         if self.booting:
365             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
366         else:
367             self.stop()