Merge branch '9303-actor-dead-dead' refs #9303
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22         self.shutdown_actor = None
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._later = self.actor_ref.tell_proxy()
125         self.shutdown_windows = shutdown_windows
126         self.server_calculator = server_calculator
127         self.min_cloud_size = self.server_calculator.cheapest_size()
128         self.min_nodes = min_nodes
129         self.max_nodes = max_nodes
130         self.max_total_price = max_total_price
131         self.poll_stale_after = poll_stale_after
132         self.boot_fail_after = boot_fail_after
133         self.node_stale_after = node_stale_after
134         self.last_polls = {}
135         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136             poll_actor = locals()[poll_name + '_actor']
137             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139             self.last_polls[poll_name] = -self.poll_stale_after
140         self.cloud_nodes = _CloudNodeTracker()
141         self.arvados_nodes = _ArvadosNodeTracker()
142         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
143         self.sizes_booting = {} # Actor IDs to node size
144
145     def on_start(self):
146         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
147         self._logger.debug("Daemon started")
148
149     def _update_poll_time(self, poll_key):
150         self.last_polls[poll_key] = time.time()
151
152     def _pair_nodes(self, node_record, arvados_node):
153         self._logger.info("Cloud node %s is now paired with Arvados node %s",
154                           node_record.cloud_node.name, arvados_node['uuid'])
155         self._arvados_nodes_actor.subscribe_to(
156             arvados_node['uuid'], node_record.actor.update_arvados_node)
157         node_record.arvados_node = arvados_node
158         self.arvados_nodes.add(node_record)
159
160     def _new_node(self, cloud_node):
161         start_time = self._cloud_driver.node_start_time(cloud_node)
162         shutdown_timer = cnode.ShutdownTimer(start_time,
163                                              self.shutdown_windows)
164         actor = self._node_actor.start(
165             cloud_node=cloud_node,
166             cloud_node_start_time=start_time,
167             shutdown_timer=shutdown_timer,
168             cloud_fqdn_func=self._cloud_driver.node_fqdn,
169             update_actor=self._cloud_updater,
170             timer_actor=self._timer,
171             arvados_node=None,
172             poll_stale_after=self.poll_stale_after,
173             node_stale_after=self.node_stale_after,
174             cloud_client=self._cloud_driver,
175             boot_fail_after=self.boot_fail_after)
176         actorTell = actor.tell_proxy()
177         actorTell.subscribe(self._later.node_can_shutdown)
178         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
179                                              actorTell.update_cloud_node)
180         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
181         return record
182
183     def _register_cloud_node(self, node):
184         rec = self.cloud_nodes.get(node.id)
185         if rec is None:
186             self._logger.info("Registering new cloud node %s", node.id)
187             record = self._new_node(node)
188             self.cloud_nodes.add(record)
189         else:
190             rec.cloud_node = node
191
192     def update_cloud_nodes(self, nodelist):
193         self._update_poll_time('cloud_nodes')
194         for _, node in self.cloud_nodes.update_from(nodelist):
195             self._register_cloud_node(node)
196
197         self.try_pairing()
198
199         for record in self.cloud_nodes.orphans.itervalues():
200             if record.shutdown_actor:
201                 try:
202                     record.shutdown_actor.stop()
203                 except pykka.ActorDeadError:
204                     pass
205                 record.shutdown_actor = None
206
207             # A recently booted node is a node that successfully completed the
208             # setup actor but has not yet appeared in the cloud node list.
209             # This will have the tag _nodemanager_recently_booted on it, which
210             # means (if we're not shutting it down) we want to put it back into
211             # the cloud node list.  Once it really appears in the cloud list,
212             # the object in record.cloud_node will be replaced by a new one
213             # that lacks the "_nodemanager_recently_booted" tag.
214             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
215                 self.cloud_nodes.add(record)
216             else:
217                 record.actor.stop()
218                 record.cloud_node = None
219
220     def _register_arvados_node(self, key, arv_node):
221         self._logger.info("Registering new Arvados node %s", key)
222         record = _ComputeNodeRecord(arvados_node=arv_node)
223         self.arvados_nodes.add(record)
224
225     def update_arvados_nodes(self, nodelist):
226         self._update_poll_time('arvados_nodes')
227         for key, node in self.arvados_nodes.update_from(nodelist):
228             self._register_arvados_node(key, node)
229         self.try_pairing()
230
231     def try_pairing(self):
232         for record in self.cloud_nodes.unpaired():
233             for arv_rec in self.arvados_nodes.unpaired():
234                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
235                     self._pair_nodes(record, arv_rec.arvados_node)
236                     break
237
238     def _nodes_booting(self, size):
239         s = sum(1
240                 for c in self.booting.iterkeys()
241                 if size is None or self.sizes_booting[c].id == size.id)
242         return s
243
244     def _node_states(self, size):
245         states = pykka.get_all(rec.actor.get_state()
246                                for rec in self.cloud_nodes.nodes.itervalues()
247                                if ((size is None or rec.cloud_node.size.id == size.id) and
248                                    rec.shutdown_actor is None))
249         states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues()
250                    if ((size is None or rec.cloud_node.size.id == size.id) and
251                        rec.shutdown_actor is not None)]
252         return states
253
254     def _state_counts(self, size):
255         states = self._node_states(size)
256         counts = {
257             "booting": self._nodes_booting(size),
258             "unpaired": 0,
259             "busy": 0,
260             "idle": 0,
261             "down": 0,
262             "shutdown": 0
263         }
264         for s in states:
265             counts[s] = counts[s] + 1
266         return counts
267
268     def _nodes_up(self, counts):
269         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
270         return up
271
272     def _total_price(self):
273         cost = 0
274         cost += sum(self.server_calculator.find_size(self.sizes_booting[c].id).price
275                   for c in self.booting.iterkeys())
276         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
277                     for c in self.cloud_nodes.nodes.itervalues())
278         return cost
279
280     def _size_wishlist(self, size):
281         return sum(1 for c in self.last_wishlist if c.id == size.id)
282
283     def _nodes_wanted(self, size):
284         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
285         under_min = self.min_nodes - total_node_count
286         over_max = total_node_count - self.max_nodes
287         total_price = self._total_price()
288
289         counts = self._state_counts(size)
290
291         up_count = self._nodes_up(counts)
292         busy_count = counts["busy"]
293
294         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
295                           self._size_wishlist(size),
296                           up_count,
297                           counts["booting"],
298                           counts["unpaired"],
299                           counts["idle"],
300                           busy_count,
301                           counts["down"],
302                           counts["shutdown"])
303
304         if over_max >= 0:
305             return -over_max
306         elif under_min > 0 and size.id == self.min_cloud_size.id:
307             return under_min
308
309         wanted = self._size_wishlist(size) - (up_count - busy_count)
310         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
311             can_boot = int((self.max_total_price - total_price) / size.price)
312             if can_boot == 0:
313                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
314                                   size.name, size.price, self.max_total_price, total_price)
315             return can_boot
316         else:
317             return wanted
318
319     def _nodes_excess(self, size):
320         counts = self._state_counts(size)
321         up_count = self._nodes_up(counts)
322         if size.id == self.min_cloud_size.id:
323             up_count -= self.min_nodes
324         return up_count - (counts["busy"] + self._size_wishlist(size))
325
326     def update_server_wishlist(self, wishlist):
327         self._update_poll_time('server_wishlist')
328         self.last_wishlist = wishlist
329         for size in reversed(self.server_calculator.cloud_sizes):
330             try:
331                 nodes_wanted = self._nodes_wanted(size)
332                 if nodes_wanted > 0:
333                     self._later.start_node(size)
334                 elif (nodes_wanted < 0) and self.booting:
335                     self._later.stop_booting_node(size)
336             except Exception as e:
337                 self._logger.exception("while calculating nodes wanted for size %s", size)
338
339     def _check_poll_freshness(orig_func):
340         """Decorator to inhibit a method when poll information is stale.
341
342         This decorator checks the timestamps of all the poll information the
343         daemon has received.  The decorated method is only called if none
344         of the timestamps are considered stale.
345         """
346         @functools.wraps(orig_func)
347         def wrapper(self, *args, **kwargs):
348             now = time.time()
349             if all(now - t < self.poll_stale_after
350                    for t in self.last_polls.itervalues()):
351                 return orig_func(self, *args, **kwargs)
352             else:
353                 return None
354         return wrapper
355
356     @_check_poll_freshness
357     def start_node(self, cloud_size):
358         nodes_wanted = self._nodes_wanted(cloud_size)
359         if nodes_wanted < 1:
360             return None
361         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
362         self._logger.info("Want %i more %s nodes.  Booting a node.",
363                           nodes_wanted, cloud_size.name)
364         new_setup = self._node_setup.start(
365             timer_actor=self._timer,
366             arvados_client=self._new_arvados(),
367             arvados_node=arvados_node,
368             cloud_client=self._new_cloud(),
369             cloud_size=cloud_size).proxy()
370         self.booting[new_setup.actor_ref.actor_urn] = new_setup
371         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
372
373         if arvados_node is not None:
374             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
375                 time.time())
376         new_setup.subscribe(self._later.node_up)
377         if nodes_wanted > 1:
378             self._later.start_node(cloud_size)
379
380     def _get_actor_attrs(self, actor, *attr_names):
381         return pykka.get_all([getattr(actor, name) for name in attr_names])
382
383     def node_up(self, setup_proxy):
384         # Called when a SetupActor has completed.
385         cloud_node, arvados_node = self._get_actor_attrs(
386             setup_proxy, 'cloud_node', 'arvados_node')
387         setup_proxy.stop()
388
389         # If cloud_node is None then the node create wasn't
390         # successful and so there isn't anything to do.
391         if cloud_node is not None:
392             # Node creation succeeded.  Update cloud node list.
393             cloud_node._nodemanager_recently_booted = True
394             self._register_cloud_node(cloud_node)
395         del self.booting[setup_proxy.actor_ref.actor_urn]
396         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
397
398     @_check_poll_freshness
399     def stop_booting_node(self, size):
400         nodes_excess = self._nodes_excess(size)
401         if (nodes_excess < 1) or not self.booting:
402             return None
403         for key, node in self.booting.iteritems():
404             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
405                 del self.booting[key]
406                 del self.sizes_booting[key]
407
408                 if nodes_excess > 1:
409                     self._later.stop_booting_node(size)
410                 break
411
412     def _begin_node_shutdown(self, node_actor, cancellable):
413         cloud_node_obj = node_actor.cloud_node.get()
414         cloud_node_id = cloud_node_obj.id
415         record = self.cloud_nodes[cloud_node_id]
416         if record.shutdown_actor is not None:
417             return None
418         shutdown = self._node_shutdown.start(
419             timer_actor=self._timer, cloud_client=self._new_cloud(),
420             arvados_client=self._new_arvados(),
421             node_monitor=node_actor.actor_ref, cancellable=cancellable)
422         record.shutdown_actor = shutdown.proxy()
423         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
424
425     @_check_poll_freshness
426     def node_can_shutdown(self, node_actor):
427         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
428             self._begin_node_shutdown(node_actor, cancellable=True)
429         elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
430             # Node is unpaired, which means it probably exceeded its booting
431             # grace period without a ping, so shut it down so we can boot a new
432             # node in its place.
433             self._begin_node_shutdown(node_actor, cancellable=False)
434         elif node_actor.in_state('down').get():
435             # Node is down and unlikely to come back.
436             self._begin_node_shutdown(node_actor, cancellable=False)
437
438     def node_finished_shutdown(self, shutdown_actor):
439         try:
440             cloud_node, success, cancel_reason = self._get_actor_attrs(
441                 shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
442         except pykka.ActorDeadError:
443             return
444         cloud_node_id = cloud_node.id
445         record = self.cloud_nodes[cloud_node_id]
446         shutdown_actor.stop()
447         if not success:
448             if cancel_reason == self._node_shutdown.NODE_BROKEN:
449                 self.cloud_nodes.blacklist(cloud_node_id)
450             record.shutdown_actor = None
451         else:
452             # If the node went from being booted to being shut down without ever
453             # appearing in the cloud node list, it will have the
454             # _nodemanager_recently_booted tag, so get rid of it so that the node
455             # can be forgotten completely.
456             if hasattr(self.cloud_nodes[cloud_node_id].cloud_node, "_nodemanager_recently_booted"):
457                 del self.cloud_nodes[cloud_node_id].cloud_node._nodemanager_recently_booted
458
459     def shutdown(self):
460         self._logger.info("Shutting down after signal.")
461         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
462         setup_stops = {key: node.stop_if_no_cloud_node()
463                        for key, node in self.booting.iteritems()}
464         self.booting = {key: self.booting[key]
465                         for key in setup_stops if not setup_stops[key].get()}
466         self._later.await_shutdown()
467
468     def await_shutdown(self):
469         if self.booting:
470             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
471         else:
472             self.stop()