65fddd29ec54df34fdc7bcbc008447e237e1f89a
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor):
114         super(NodeManagerDaemonActor, self).__init__()
115         self._node_setup = node_setup_class
116         self._node_shutdown = node_shutdown_class
117         self._node_actor = node_actor_class
118         self._cloud_updater = cloud_update_actor
119         self._timer = timer_actor
120         self._new_arvados = arvados_factory
121         self._new_cloud = cloud_factory
122         self._cloud_driver = self._new_cloud()
123         self._logger = logging.getLogger('arvnodeman.daemon')
124         self._later = self.actor_ref.proxy()
125         self.shutdown_windows = shutdown_windows
126         self.server_calculator = server_calculator
127         self.min_cloud_size = self.server_calculator.cheapest_size()
128         self.min_nodes = min_nodes
129         self.max_nodes = max_nodes
130         self.poll_stale_after = poll_stale_after
131         self.boot_fail_after = boot_fail_after
132         self.node_stale_after = node_stale_after
133         self.last_polls = {}
134         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
135             poll_actor = locals()[poll_name + '_actor']
136             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
137             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
138             self.last_polls[poll_name] = -self.poll_stale_after
139         self.cloud_nodes = _CloudNodeTracker()
140         self.arvados_nodes = _ArvadosNodeTracker()
141         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
142         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
143         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
144         self._logger.debug("Daemon initialized")
145
146     def _update_poll_time(self, poll_key):
147         self.last_polls[poll_key] = time.time()
148
149     def _pair_nodes(self, node_record, arvados_node):
150         self._logger.info("Cloud node %s has associated with Arvados node %s",
151                           node_record.cloud_node.id, arvados_node['uuid'])
152         self._arvados_nodes_actor.subscribe_to(
153             arvados_node['uuid'], node_record.actor.update_arvados_node)
154         node_record.arvados_node = arvados_node
155         self.arvados_nodes.add(node_record)
156
157     def _new_node(self, cloud_node):
158         start_time = self._cloud_driver.node_start_time(cloud_node)
159         shutdown_timer = cnode.ShutdownTimer(start_time,
160                                              self.shutdown_windows)
161         actor = self._node_actor.start(
162             cloud_node=cloud_node,
163             cloud_node_start_time=start_time,
164             shutdown_timer=shutdown_timer,
165             cloud_fqdn_func=self._cloud_driver.node_fqdn,
166             update_actor=self._cloud_updater,
167             timer_actor=self._timer,
168             arvados_node=None,
169             poll_stale_after=self.poll_stale_after,
170             node_stale_after=self.node_stale_after,
171             cloud_client=self._cloud_driver,
172             boot_fail_after=self.boot_fail_after).proxy()
173         actor.subscribe(self._later.node_can_shutdown)
174         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
175                                              actor.update_cloud_node)
176         record = _ComputeNodeRecord(actor, cloud_node)
177         return record
178
179     def update_cloud_nodes(self, nodelist):
180         self._update_poll_time('cloud_nodes')
181         for key, node in self.cloud_nodes.update_from(nodelist):
182             self._logger.info("Registering new cloud node %s", key)
183             if key in self.booted:
184                 record = self.booted.pop(key)
185             else:
186                 record = self._new_node(node)
187             self.cloud_nodes.add(record)
188             for arv_rec in self.arvados_nodes.unpaired():
189                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
190                     self._pair_nodes(record, arv_rec.arvados_node)
191                     break
192         for key, record in self.cloud_nodes.orphans.iteritems():
193             if key in self.shutdowns:
194                 try:
195                     self.shutdowns[key].stop().get()
196                 except pykka.ActorDeadError:
197                     pass
198                 del self.shutdowns[key]
199             record.actor.stop()
200             record.cloud_node = None
201
202     def update_arvados_nodes(self, nodelist):
203         self._update_poll_time('arvados_nodes')
204         for key, node in self.arvados_nodes.update_from(nodelist):
205             self._logger.info("Registering new Arvados node %s", key)
206             record = _ComputeNodeRecord(arvados_node=node)
207             self.arvados_nodes.add(record)
208         for arv_rec in self.arvados_nodes.unpaired():
209             arv_node = arv_rec.arvados_node
210             for cloud_rec in self.cloud_nodes.unpaired():
211                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
212                     self._pair_nodes(cloud_rec, arv_node)
213                     break
214
215     def _nodes_up(self):
216         return sum(len(nodelist) for nodelist in
217                    [self.cloud_nodes, self.booted, self.booting])
218
219     def _nodes_busy(self):
220         return sum(1 for busy in
221                    pykka.get_all(rec.actor.in_state('busy') for rec in
222                                  self.cloud_nodes.nodes.itervalues())
223                    if busy)
224
225     def _nodes_missing(self):
226         return sum(1 for arv_node in
227                    pykka.get_all(rec.actor.arvados_node for rec in
228                                  self.cloud_nodes.nodes.itervalues()
229                                  if rec.actor.cloud_node.get().id not in self.shutdowns)
230                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
231
232     def _nodes_wanted(self):
233         up_count = self._nodes_up()
234         under_min = self.min_nodes - up_count
235         over_max = up_count - self.max_nodes
236         if over_max >= 0:
237             return -over_max
238         elif under_min > 0:
239             return under_min
240         else:
241             up_count -= len(self.shutdowns) + self._nodes_busy() + self._nodes_missing()
242             return len(self.last_wishlist) - up_count
243
244     def _nodes_excess(self):
245         up_count = self._nodes_up() - len(self.shutdowns)
246         over_min = up_count - self.min_nodes
247         if over_min <= 0:
248             return over_min
249         else:
250             return up_count - self._nodes_busy() - len(self.last_wishlist)
251
252     def update_server_wishlist(self, wishlist):
253         self._update_poll_time('server_wishlist')
254         self.last_wishlist = wishlist
255         nodes_wanted = self._nodes_wanted()
256         if nodes_wanted > 0:
257             self._later.start_node()
258         elif (nodes_wanted < 0) and self.booting:
259             self._later.stop_booting_node()
260
261     def _check_poll_freshness(orig_func):
262         """Decorator to inhibit a method when poll information is stale.
263
264         This decorator checks the timestamps of all the poll information the
265         daemon has received.  The decorated method is only called if none
266         of the timestamps are considered stale.
267         """
268         @functools.wraps(orig_func)
269         def wrapper(self, *args, **kwargs):
270             now = time.time()
271             if all(now - t < self.poll_stale_after
272                    for t in self.last_polls.itervalues()):
273                 return orig_func(self, *args, **kwargs)
274             else:
275                 return None
276         return wrapper
277
278     @_check_poll_freshness
279     def start_node(self):
280         nodes_wanted = self._nodes_wanted()
281         if nodes_wanted < 1:
282             return None
283         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
284         try:
285             cloud_size = self.last_wishlist[self._nodes_up()]
286         except IndexError:
287             cloud_size = self.min_cloud_size
288         self._logger.info("Want %s more nodes.  Booting a %s node.",
289                           nodes_wanted, cloud_size.name)
290         new_setup = self._node_setup.start(
291             timer_actor=self._timer,
292             arvados_client=self._new_arvados(),
293             arvados_node=arvados_node,
294             cloud_client=self._new_cloud(),
295             cloud_size=cloud_size).proxy()
296         self.booting[new_setup.actor_ref.actor_urn] = new_setup
297         if arvados_node is not None:
298             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
299                 time.time())
300         new_setup.subscribe(self._later.node_up)
301         if nodes_wanted > 1:
302             self._later.start_node()
303
304     def _get_actor_attrs(self, actor, *attr_names):
305         return pykka.get_all([getattr(actor, name) for name in attr_names])
306
307     def node_up(self, setup_proxy):
308         cloud_node = setup_proxy.cloud_node.get()
309         del self.booting[setup_proxy.actor_ref.actor_urn]
310         setup_proxy.stop()
311         record = self.cloud_nodes.get(cloud_node.id)
312         if record is None:
313             record = self._new_node(cloud_node)
314             self.booted[cloud_node.id] = record
315         self._timer.schedule(time.time() + self.boot_fail_after,
316                              self._later.shutdown_unpaired_node, cloud_node.id)
317
318     @_check_poll_freshness
319     def stop_booting_node(self):
320         nodes_excess = self._nodes_excess()
321         if (nodes_excess < 1) or not self.booting:
322             return None
323         for key, node in self.booting.iteritems():
324             if node.stop_if_no_cloud_node().get():
325                 del self.booting[key]
326                 if nodes_excess > 1:
327                     self._later.stop_booting_node()
328                 break
329
330     def _begin_node_shutdown(self, node_actor, cancellable):
331         cloud_node_id = node_actor.cloud_node.get().id
332         if cloud_node_id in self.shutdowns:
333             return None
334         shutdown = self._node_shutdown.start(
335             timer_actor=self._timer, cloud_client=self._new_cloud(),
336             arvados_client=self._new_arvados(),
337             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
338         self.shutdowns[cloud_node_id] = shutdown
339         shutdown.subscribe(self._later.node_finished_shutdown)
340
341     @_check_poll_freshness
342     def node_can_shutdown(self, node_actor):
343         if self._nodes_excess() > 0:
344             self._begin_node_shutdown(node_actor, cancellable=True)
345
346     def shutdown_unpaired_node(self, cloud_node_id):
347         for record_dict in [self.cloud_nodes, self.booted]:
348             if cloud_node_id in record_dict:
349                 record = record_dict[cloud_node_id]
350                 break
351         else:
352             return None
353         if not record.actor.in_state('idle', 'busy').get():
354             self._begin_node_shutdown(record.actor, cancellable=False)
355
356     def node_finished_shutdown(self, shutdown_actor):
357         cloud_node, success, cancel_reason = self._get_actor_attrs(
358             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
359         shutdown_actor.stop()
360         cloud_node_id = cloud_node.id
361         if not success:
362             if cancel_reason == self._node_shutdown.NODE_BROKEN:
363                 self.cloud_nodes.blacklist(cloud_node_id)
364             del self.shutdowns[cloud_node_id]
365         elif cloud_node_id in self.booted:
366             self.booted.pop(cloud_node_id).actor.stop()
367             del self.shutdowns[cloud_node_id]
368
369     def shutdown(self):
370         self._logger.info("Shutting down after signal.")
371         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
372         setup_stops = {key: node.stop_if_no_cloud_node()
373                        for key, node in self.booting.iteritems()}
374         self.booting = {key: self.booting[key]
375                         for key in setup_stops if not setup_stops[key].get()}
376         self._later.await_shutdown()
377
378     def await_shutdown(self):
379         if self.booting:
380             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
381         else:
382             self.stop()