Merge branch 'master' into 4358-graph-not-comparing
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     # Proxy the methods listed below to self.nodes.
30     def _proxy_method(name):
31         method = getattr(dict, name)
32         @functools.wraps(method, ('__name__', '__doc__'))
33         def wrapper(self, *args, **kwargs):
34             return method(self.nodes, *args, **kwargs)
35         return wrapper
36
37     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38         locals()[_method_name] = _proxy_method(_method_name)
39
40     def record_key(self, record):
41         return self.item_key(getattr(record, self.RECORD_ATTR))
42
43     def add(self, record):
44         self.nodes[self.record_key(record)] = record
45
46     def update_record(self, key, item):
47         setattr(self.nodes[key], self.RECORD_ATTR, item)
48
49     def update_from(self, response):
50         unseen = set(self.nodes.iterkeys())
51         for item in response:
52             key = self.item_key(item)
53             if key in unseen:
54                 unseen.remove(key)
55                 self.update_record(key, item)
56             else:
57                 yield key, item
58         self.orphans = {key: self.nodes.pop(key) for key in unseen}
59
60     def unpaired(self):
61         return (record for record in self.nodes.itervalues()
62                 if getattr(record, self.PAIR_ATTR) is None)
63
64
65 class _CloudNodeTracker(_BaseNodeTracker):
66     RECORD_ATTR = 'cloud_node'
67     PAIR_ATTR = 'arvados_node'
68     item_key = staticmethod(lambda cloud_node: cloud_node.id)
69
70
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'arvados_node'
73     PAIR_ATTR = 'cloud_node'
74     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
75
76     def find_stale_node(self, stale_time):
77         for record in self.nodes.itervalues():
78             node = record.arvados_node
79             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
80                                           stale_time) and
81                   not cnode.timestamp_fresh(record.assignment_time,
82                                             stale_time)):
83                 return node
84         return None
85
86
87 class NodeManagerDaemonActor(actor_class):
88     """Node Manager daemon.
89
90     This actor subscribes to all information polls about cloud nodes,
91     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
92     for every cloud node, subscribing them to poll updates
93     appropriately.  It creates and destroys cloud nodes based on job queue
94     demand, and stops the corresponding ComputeNode actors when their work
95     is done.
96     """
97     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98                  cloud_nodes_actor, cloud_update_actor, timer_actor,
99                  arvados_factory, cloud_factory,
100                  shutdown_windows, min_nodes, max_nodes,
101                  poll_stale_after=600,
102                  boot_fail_after=1800,
103                  node_stale_after=7200,
104                  node_setup_class=dispatch.ComputeNodeSetupActor,
105                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
106                  node_actor_class=dispatch.ComputeNodeMonitorActor):
107         super(NodeManagerDaemonActor, self).__init__()
108         self._node_setup = node_setup_class
109         self._node_shutdown = node_shutdown_class
110         self._node_actor = node_actor_class
111         self._cloud_updater = cloud_update_actor
112         self._timer = timer_actor
113         self._new_arvados = arvados_factory
114         self._new_cloud = cloud_factory
115         self._cloud_driver = self._new_cloud()
116         self._logger = logging.getLogger('arvnodeman.daemon')
117         self._later = self.actor_ref.proxy()
118         self.shutdown_windows = shutdown_windows
119         self.min_nodes = min_nodes
120         self.max_nodes = max_nodes
121         self.poll_stale_after = poll_stale_after
122         self.boot_fail_after = boot_fail_after
123         self.node_stale_after = node_stale_after
124         self.last_polls = {}
125         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
126             poll_actor = locals()[poll_name + '_actor']
127             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
128             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
129             self.last_polls[poll_name] = -self.poll_stale_after
130         self.cloud_nodes = _CloudNodeTracker()
131         self.arvados_nodes = _ArvadosNodeTracker()
132         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
133         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
134         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
135         self._logger.debug("Daemon initialized")
136
137     def _update_poll_time(self, poll_key):
138         self.last_polls[poll_key] = time.time()
139
140     def _pair_nodes(self, node_record, arvados_node):
141         self._logger.info("Cloud node %s has associated with Arvados node %s",
142                           node_record.cloud_node.id, arvados_node['uuid'])
143         self._arvados_nodes_actor.subscribe_to(
144             arvados_node['uuid'], node_record.actor.update_arvados_node)
145         node_record.arvados_node = arvados_node
146         self.arvados_nodes.add(node_record)
147
148     def _new_node(self, cloud_node):
149         start_time = self._cloud_driver.node_start_time(cloud_node)
150         shutdown_timer = cnode.ShutdownTimer(start_time,
151                                              self.shutdown_windows)
152         actor = self._node_actor.start(
153             cloud_node=cloud_node,
154             cloud_node_start_time=start_time,
155             shutdown_timer=shutdown_timer,
156             update_actor=self._cloud_updater,
157             timer_actor=self._timer,
158             arvados_node=None,
159             poll_stale_after=self.poll_stale_after,
160             node_stale_after=self.node_stale_after).proxy()
161         actor.subscribe(self._later.node_can_shutdown)
162         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
163                                              actor.update_cloud_node)
164         record = _ComputeNodeRecord(actor, cloud_node)
165         return record
166
167     def update_cloud_nodes(self, nodelist):
168         self._update_poll_time('cloud_nodes')
169         for key, node in self.cloud_nodes.update_from(nodelist):
170             self._logger.info("Registering new cloud node %s", key)
171             if key in self.booted:
172                 record = self.booted.pop(key)
173             else:
174                 record = self._new_node(node)
175             self.cloud_nodes.add(record)
176             for arv_rec in self.arvados_nodes.unpaired():
177                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
178                     self._pair_nodes(record, arv_rec.arvados_node)
179                     break
180         for key, record in self.cloud_nodes.orphans.iteritems():
181             record.actor.stop()
182             record.cloud_node = None
183             self.shutdowns.pop(key, None)
184
185     def update_arvados_nodes(self, nodelist):
186         self._update_poll_time('arvados_nodes')
187         for key, node in self.arvados_nodes.update_from(nodelist):
188             self._logger.info("Registering new Arvados node %s", key)
189             record = _ComputeNodeRecord(arvados_node=node)
190             self.arvados_nodes.add(record)
191         for arv_rec in self.arvados_nodes.unpaired():
192             arv_node = arv_rec.arvados_node
193             for cloud_rec in self.cloud_nodes.unpaired():
194                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
195                     self._pair_nodes(cloud_rec, arv_node)
196                     break
197
198     def _nodes_up(self):
199         return sum(len(nodelist) for nodelist in
200                    [self.cloud_nodes, self.booted, self.booting])
201
202     def _nodes_busy(self):
203         return sum(1 for idle in
204                    pykka.get_all(rec.actor.in_state('idle') for rec in
205                                  self.cloud_nodes.nodes.itervalues())
206                    if idle is False)
207
208     def _nodes_wanted(self):
209         up_count = self._nodes_up()
210         over_max = up_count - self.max_nodes
211         if over_max >= 0:
212             return -over_max
213         else:
214             up_count -= len(self.shutdowns) + self._nodes_busy()
215             return len(self.last_wishlist) - up_count
216
217     def _nodes_excess(self):
218         up_count = self._nodes_up() - len(self.shutdowns)
219         over_min = up_count - self.min_nodes
220         if over_min <= 0:
221             return over_min
222         else:
223             return up_count - self._nodes_busy() - len(self.last_wishlist)
224
225     def update_server_wishlist(self, wishlist):
226         self._update_poll_time('server_wishlist')
227         self.last_wishlist = wishlist
228         nodes_wanted = self._nodes_wanted()
229         if nodes_wanted > 0:
230             self._later.start_node()
231         elif (nodes_wanted < 0) and self.booting:
232             self._later.stop_booting_node()
233
234     def _check_poll_freshness(orig_func):
235         """Decorator to inhibit a method when poll information is stale.
236
237         This decorator checks the timestamps of all the poll information the
238         daemon has received.  The decorated method is only called if none
239         of the timestamps are considered stale.
240         """
241         @functools.wraps(orig_func)
242         def wrapper(self, *args, **kwargs):
243             now = time.time()
244             if all(now - t < self.poll_stale_after
245                    for t in self.last_polls.itervalues()):
246                 return orig_func(self, *args, **kwargs)
247             else:
248                 return None
249         return wrapper
250
251     @_check_poll_freshness
252     def start_node(self):
253         nodes_wanted = self._nodes_wanted()
254         if nodes_wanted < 1:
255             return None
256         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
257         cloud_size = self.last_wishlist[nodes_wanted - 1]
258         self._logger.info("Want %s more nodes.  Booting a %s node.",
259                           nodes_wanted, cloud_size.name)
260         new_setup = self._node_setup.start(
261             timer_actor=self._timer,
262             arvados_client=self._new_arvados(),
263             arvados_node=arvados_node,
264             cloud_client=self._new_cloud(),
265             cloud_size=cloud_size).proxy()
266         self.booting[new_setup.actor_ref.actor_urn] = new_setup
267         if arvados_node is not None:
268             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
269                 time.time())
270         new_setup.subscribe(self._later.node_up)
271         if nodes_wanted > 1:
272             self._later.start_node()
273
274     def _get_actor_attrs(self, actor, *attr_names):
275         return pykka.get_all([getattr(actor, name) for name in attr_names])
276
277     def node_up(self, setup_proxy):
278         cloud_node = setup_proxy.cloud_node.get()
279         del self.booting[setup_proxy.actor_ref.actor_urn]
280         setup_proxy.stop()
281         record = self.cloud_nodes.get(cloud_node.id)
282         if record is None:
283             record = self._new_node(cloud_node)
284             self.booted[cloud_node.id] = record
285         self._timer.schedule(time.time() + self.boot_fail_after,
286                              self._later.shutdown_unpaired_node, cloud_node.id)
287
288     @_check_poll_freshness
289     def stop_booting_node(self):
290         nodes_excess = self._nodes_excess()
291         if (nodes_excess < 1) or not self.booting:
292             return None
293         for key, node in self.booting.iteritems():
294             node.stop_if_no_cloud_node().get()
295             if not node.actor_ref.is_alive():
296                 del self.booting[key]
297                 if nodes_excess > 1:
298                     self._later.stop_booting_node()
299                 break
300
301     def _begin_node_shutdown(self, node_actor, cancellable):
302         cloud_node_id = node_actor.cloud_node.get().id
303         if cloud_node_id in self.shutdowns:
304             return None
305         shutdown = self._node_shutdown.start(
306             timer_actor=self._timer, cloud_client=self._new_cloud(),
307             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
308         self.shutdowns[cloud_node_id] = shutdown
309         shutdown.subscribe(self._later.node_finished_shutdown)
310
311     @_check_poll_freshness
312     def node_can_shutdown(self, node_actor):
313         if self._nodes_excess() > 0:
314             self._begin_node_shutdown(node_actor, cancellable=True)
315
316     def shutdown_unpaired_node(self, cloud_node_id):
317         for record_dict in [self.cloud_nodes, self.booted]:
318             if cloud_node_id in record_dict:
319                 record = record_dict[cloud_node_id]
320                 break
321         else:
322             return None
323         if record.arvados_node is None:
324             self._begin_node_shutdown(record.actor, cancellable=False)
325
326     def node_finished_shutdown(self, shutdown_actor):
327         success, cloud_node = self._get_actor_attrs(shutdown_actor, 'success',
328                                                     'cloud_node')
329         shutdown_actor.stop()
330         cloud_node_id = cloud_node.id
331         if not success:
332             del self.shutdowns[cloud_node_id]
333         elif cloud_node_id in self.booted:
334             self.booted.pop(cloud_node_id).actor.stop()
335             del self.shutdowns[cloud_node_id]
336
337     def shutdown(self):
338         self._logger.info("Shutting down after signal.")
339         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
340         for bootnode in self.booting.itervalues():
341             bootnode.stop_if_no_cloud_node()
342         self._later.await_shutdown()
343
344     def await_shutdown(self):
345         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
346             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
347         else:
348             self.stop()