13804: Try to cancel pending shutdowns if nodes are needed
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11
12 import pykka
13
14 from . import computenode as cnode
15 from . import status
16 from .computenode import dispatch
17 from .config import actor_class
18
19 class _ComputeNodeRecord(object):
20     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21                  assignment_time=float('-inf')):
22         self.actor = actor
23         self.cloud_node = cloud_node
24         self.arvados_node = arvados_node
25         self.assignment_time = assignment_time
26         self.shutdown_actor = None
27
28 class _BaseNodeTracker(object):
29     def __init__(self):
30         self.nodes = {}
31         self.orphans = {}
32
33     # Proxy the methods listed below to self.nodes.
34     def _proxy_method(name):
35         method = getattr(dict, name)
36         @functools.wraps(method, ('__name__', '__doc__'))
37         def wrapper(self, *args, **kwargs):
38             return method(self.nodes, *args, **kwargs)
39         return wrapper
40
41     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42         locals()[_method_name] = _proxy_method(_method_name)
43
44     def record_key(self, record):
45         return self.item_key(getattr(record, self.RECORD_ATTR))
46
47     def add(self, record):
48         self.nodes[self.record_key(record)] = record
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in unseen:
58                 unseen.remove(key)
59                 self.update_record(key, item)
60             else:
61                 yield key, item
62         self.orphans = {key: self.nodes.pop(key) for key in unseen}
63
64     def unpaired(self):
65         return (record for record in self.nodes.itervalues()
66                 if getattr(record, self.PAIR_ATTR) is None)
67
68
69 class _CloudNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'cloud_node'
71     PAIR_ATTR = 'arvados_node'
72     item_key = staticmethod(lambda cloud_node: cloud_node.id)
73
74
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76     RECORD_ATTR = 'arvados_node'
77     PAIR_ATTR = 'cloud_node'
78     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
79
80     def find_stale_node(self, stale_time):
81         # Try to select a stale node record that have an assigned slot first
82         for record in sorted(self.nodes.itervalues(),
83                              key=lambda r: r.arvados_node['slot_number'],
84                              reverse=True):
85             node = record.arvados_node
86             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87                                           stale_time) and
88                   not cnode.timestamp_fresh(record.assignment_time,
89                                             stale_time)):
90                 return node
91         return None
92
93
94 class NodeManagerDaemonActor(actor_class):
95     """Node Manager daemon.
96
97     This actor subscribes to all information polls about cloud nodes,
98     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
99     for every cloud node, subscribing them to poll updates
100     appropriately.  It creates and destroys cloud nodes based on job queue
101     demand, and stops the corresponding ComputeNode actors when their work
102     is done.
103     """
104     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105                  cloud_nodes_actor, cloud_update_actor, timer_actor,
106                  arvados_factory, cloud_factory,
107                  shutdown_windows, server_calculator,
108                  min_nodes, max_nodes,
109                  poll_stale_after=600,
110                  boot_fail_after=1800,
111                  node_stale_after=7200,
112                  node_setup_class=dispatch.ComputeNodeSetupActor,
113                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114                  node_actor_class=dispatch.ComputeNodeMonitorActor,
115                  max_total_price=0):
116         super(NodeManagerDaemonActor, self).__init__()
117         self._node_setup = node_setup_class
118         self._node_shutdown = node_shutdown_class
119         self._node_actor = node_actor_class
120         self._cloud_updater = cloud_update_actor
121         self._timer = timer_actor
122         self._new_arvados = arvados_factory
123         self._new_cloud = cloud_factory
124         self._cloud_driver = self._new_cloud()
125         self._later = self.actor_ref.tell_proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.node_quota = max_nodes
132         self.max_total_price = max_total_price
133         self.poll_stale_after = poll_stale_after
134         self.boot_fail_after = boot_fail_after
135         self.node_stale_after = node_stale_after
136         self.last_polls = {}
137         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
138             poll_actor = locals()[poll_name + '_actor']
139             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
140             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
141             self.last_polls[poll_name] = -self.poll_stale_after
142         self.cloud_nodes = _CloudNodeTracker()
143         self.arvados_nodes = _ArvadosNodeTracker()
144         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
145         self.sizes_booting = {} # Actor IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _pair_nodes(self, node_record, arvados_node):
155         self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
156                           node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
157         self._arvados_nodes_actor.subscribe_to(
158             arvados_node['uuid'], node_record.actor.update_arvados_node)
159         node_record.arvados_node = arvados_node
160         self.arvados_nodes.add(node_record)
161
162     def _new_node(self, cloud_node):
163         start_time = self._cloud_driver.node_start_time(cloud_node)
164         shutdown_timer = cnode.ShutdownTimer(start_time,
165                                              self.shutdown_windows)
166         actor = self._node_actor.start(
167             cloud_node=cloud_node,
168             cloud_node_start_time=start_time,
169             shutdown_timer=shutdown_timer,
170             update_actor=self._cloud_updater,
171             timer_actor=self._timer,
172             arvados_node=None,
173             poll_stale_after=self.poll_stale_after,
174             node_stale_after=self.node_stale_after,
175             cloud_client=self._cloud_driver,
176             boot_fail_after=self.boot_fail_after)
177         actorTell = actor.tell_proxy()
178         actorTell.subscribe(self._later.node_can_shutdown)
179         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
180                                              actorTell.update_cloud_node)
181         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
182         return record
183
184     def _register_cloud_node(self, node):
185         rec = self.cloud_nodes.get(node.id)
186         if rec is None:
187             self._logger.info("Registering new cloud node %s", node.id)
188             record = self._new_node(node)
189             self.cloud_nodes.add(record)
190         else:
191             rec.cloud_node = node
192
193     def update_cloud_nodes(self, nodelist):
194         self._update_poll_time('cloud_nodes')
195         for _, node in self.cloud_nodes.update_from(nodelist):
196             self._register_cloud_node(node)
197
198         self.try_pairing()
199
200         for record in self.cloud_nodes.orphans.itervalues():
201             if record.shutdown_actor:
202                 try:
203                     record.shutdown_actor.stop()
204                 except pykka.ActorDeadError:
205                     pass
206                 record.shutdown_actor = None
207
208             # A recently booted node is a node that successfully completed the
209             # setup actor but has not yet appeared in the cloud node list.
210             # This will have the tag _nodemanager_recently_booted on it, which
211             # means (if we're not shutting it down) we want to put it back into
212             # the cloud node list.  Once it really appears in the cloud list,
213             # the object in record.cloud_node will be replaced by a new one
214             # that lacks the "_nodemanager_recently_booted" tag.
215             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
216                 self.cloud_nodes.add(record)
217             else:
218                 # Node disappeared from the cloud node list. If it's paired,
219                 # remove its idle time counter.
220                 if record.arvados_node:
221                     status.tracker.idle_out(record.arvados_node.get('hostname'))
222                 # Stop the monitor actor if necessary and forget about the node.
223                 if record.actor:
224                     try:
225                         record.actor.stop()
226                     except pykka.ActorDeadError:
227                         pass
228                     record.actor = None
229                 record.cloud_node = None
230
231     def _register_arvados_node(self, key, arv_node):
232         self._logger.info("Registering new Arvados node %s", key)
233         record = _ComputeNodeRecord(arvados_node=arv_node)
234         self.arvados_nodes.add(record)
235
236     def update_arvados_nodes(self, nodelist):
237         self._update_poll_time('arvados_nodes')
238         for key, node in self.arvados_nodes.update_from(nodelist):
239             self._register_arvados_node(key, node)
240         self.try_pairing()
241
242     def try_pairing(self):
243         for record in self.cloud_nodes.unpaired():
244             for arv_rec in self.arvados_nodes.unpaired():
245                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
246                     self._pair_nodes(record, arv_rec.arvados_node)
247                     break
248
249     def _nodes_booting(self, size):
250         s = sum(1
251                 for c in self.booting.iterkeys()
252                 if size is None or self.sizes_booting[c].id == size.id)
253         return s
254
255     def _node_states(self, size):
256         proxy_states = []
257         states = []
258         for rec in self.cloud_nodes.nodes.itervalues():
259             if size is None or rec.cloud_node.size.id == size.id:
260                 if rec.shutdown_actor is None and rec.actor is not None:
261                     proxy_states.append(rec.actor.get_state())
262                 else:
263                     states.append("shutdown")
264         return states + pykka.get_all(proxy_states)
265
266     def _update_tracker(self):
267         updates = {
268             k: 0
269             for k in status.tracker.keys()
270             if k.startswith('nodes_')
271         }
272         for s in self._node_states(size=None):
273             updates.setdefault('nodes_'+s, 0)
274             updates['nodes_'+s] += 1
275         updates['nodes_wish'] = len(self.last_wishlist)
276         updates['node_quota'] = self.node_quota
277         status.tracker.update(updates)
278
279     def _state_counts(self, size):
280         states = self._node_states(size)
281         counts = {
282             "booting": self._nodes_booting(size),
283             "unpaired": 0,
284             "busy": 0,
285             "idle": 0,
286             "fail": 0,
287             "down": 0,
288             "shutdown": 0
289         }
290         for s in states:
291             counts[s] = counts[s] + 1
292         return counts
293
294     def _nodes_up(self, counts):
295         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
296         return up
297
298     def _total_price(self):
299         cost = 0
300         cost += sum(self.sizes_booting[c].price
301                     for c in self.booting.iterkeys())
302         cost += sum(c.cloud_node.size.price
303                     for c in self.cloud_nodes.nodes.itervalues())
304         return cost
305
306     def _size_wishlist(self, size):
307         return sum(1 for c in self.last_wishlist if c.id == size.id)
308
309     def _nodes_wanted(self, size):
310         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
311         under_min = self.min_nodes - total_node_count
312         over_max = total_node_count - self.node_quota
313         total_price = self._total_price()
314
315         counts = self._state_counts(size)
316
317         up_count = self._nodes_up(counts)
318         busy_count = counts["busy"]
319         wishlist_count = self._size_wishlist(size)
320
321         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
322                           wishlist_count,
323                           up_count,
324                           counts["booting"],
325                           counts["unpaired"],
326                           counts["idle"],
327                           busy_count,
328                           counts["down"]+counts["fail"],
329                           counts["shutdown"])
330
331         if over_max >= 0:
332             return -over_max
333         elif under_min > 0 and size.id == self.min_cloud_size.id:
334             return under_min
335
336         wanted = wishlist_count - (up_count - busy_count)
337         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
338             can_boot = int((self.max_total_price - total_price) / size.price)
339             if can_boot == 0:
340                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
341                                   size.name, size.price, self.max_total_price, total_price)
342             return can_boot
343         else:
344             return wanted
345
346     def _nodes_excess(self, size):
347         counts = self._state_counts(size)
348         up_count = self._nodes_up(counts)
349         if size.id == self.min_cloud_size.id:
350             up_count -= self.min_nodes
351         return up_count - (counts["busy"] + self._size_wishlist(size))
352
353     def update_server_wishlist(self, wishlist):
354         self._update_poll_time('server_wishlist')
355         requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
356         self.last_wishlist = wishlist[:requestable_nodes]
357         for size in reversed(self.server_calculator.cloud_sizes):
358             try:
359                 nodes_wanted = self._nodes_wanted(size)
360                 if nodes_wanted > 0:
361                     self._later.start_node(size)
362                 elif (nodes_wanted < 0) and self.booting:
363                     self._later.stop_booting_node(size)
364             except Exception:
365                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
366         try:
367             self._update_tracker()
368         except:
369             self._logger.exception("while updating tracker")
370
371     def _check_poll_freshness(orig_func):
372         """Decorator to inhibit a method when poll information is stale.
373
374         This decorator checks the timestamps of all the poll information the
375         daemon has received.  The decorated method is only called if none
376         of the timestamps are considered stale.
377         """
378         @functools.wraps(orig_func)
379         def wrapper(self, *args, **kwargs):
380             now = time.time()
381             if all(now - t < self.poll_stale_after
382                    for t in self.last_polls.itervalues()):
383                 return orig_func(self, *args, **kwargs)
384             else:
385                 return None
386         return wrapper
387
388     @_check_poll_freshness
389     def start_node(self, cloud_size):
390         nodes_wanted = self._nodes_wanted(cloud_size)
391         if nodes_wanted < 1:
392             return None
393
394         if not self.cancel_node_shutdown(cloud_size):
395             arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
396             self._logger.info("Want %i more %s nodes.  Booting a node.",
397                               nodes_wanted, cloud_size.name)
398             new_setup = self._node_setup.start(
399                 timer_actor=self._timer,
400                 arvados_client=self._new_arvados(),
401                 arvados_node=arvados_node,
402                 cloud_client=self._new_cloud(),
403                 cloud_size=self.server_calculator.find_size(cloud_size.id))
404             self.booting[new_setup.actor_urn] = new_setup.proxy()
405             self.sizes_booting[new_setup.actor_urn] = cloud_size
406
407             if arvados_node is not None:
408                 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
409                     time.time())
410             new_setup.tell_proxy().subscribe(self._later.node_setup_finished)
411
412         if nodes_wanted > 1:
413             self._later.start_node(cloud_size)
414
415     def _get_actor_attrs(self, actor, *attr_names):
416         return pykka.get_all([getattr(actor, name) for name in attr_names])
417
418     def node_setup_finished(self, setup_proxy):
419         # Called when a SetupActor has completed.
420         cloud_node, arvados_node, error = self._get_actor_attrs(
421             setup_proxy, 'cloud_node', 'arvados_node', 'error')
422         setup_proxy.stop()
423
424         if cloud_node is None:
425             # If cloud_node is None then the node create wasn't successful.
426             if error == dispatch.QuotaExceeded:
427                 # We've hit a quota limit, so adjust node_quota to stop trying to
428                 # boot new nodes until the node count goes down.
429                 self.node_quota = len(self.cloud_nodes)
430                 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
431         else:
432             # Node creation succeeded.  Update cloud node list.
433             cloud_node._nodemanager_recently_booted = True
434             self._register_cloud_node(cloud_node)
435
436             # Different quota policies may in force depending on the cloud
437             # provider, account limits, and the specific mix of nodes sizes
438             # that are already created.  If we are right at the quota limit,
439             # we want to probe to see if the last quota still applies or if we
440             # are allowed to create more nodes.
441             #
442             # For example, if the quota is actually based on core count, the
443             # quota might be 20 single-core machines or 10 dual-core machines.
444             # If we previously set node_quota to 10 dual core machines, but are
445             # now booting single core machines (actual quota 20), we want to
446             # allow the quota to expand so we don't get stuck at 10 machines
447             # forever.
448             if len(self.cloud_nodes) >= self.node_quota:
449                 self.node_quota = len(self.cloud_nodes)+1
450                 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
451
452         self.node_quota = min(self.node_quota, self.max_nodes)
453         del self.booting[setup_proxy.actor_ref.actor_urn]
454         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
455
456     @_check_poll_freshness
457     def stop_booting_node(self, size):
458         nodes_excess = self._nodes_excess(size)
459         if (nodes_excess < 1) or not self.booting:
460             return None
461         for key, node in self.booting.iteritems():
462             try:
463                 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2):
464                     del self.booting[key]
465                     del self.sizes_booting[key]
466                     if nodes_excess > 1:
467                         self._later.stop_booting_node(size)
468                     return
469             except pykka.Timeout:
470                 pass
471
472     @_check_poll_freshness
473     def cancel_node_shutdown(self, size):
474         # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled
475         for record in self.cloud_nodes.nodes.itervalues():
476             try:
477                 if (record.shutdown_actor is not None and
478                     record.size.id == size.id and
479                     record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)):
480                         return True
481             except (pykka.ActorDeadError, pykka.Timeout) as e:
482                 pass
483         return False
484
485     def _begin_node_shutdown(self, node_actor, cancellable):
486         cloud_node_obj = node_actor.cloud_node.get()
487         cloud_node_id = cloud_node_obj.id
488         record = self.cloud_nodes[cloud_node_id]
489         if record.shutdown_actor is not None:
490             return None
491         shutdown = self._node_shutdown.start(
492             timer_actor=self._timer, cloud_client=self._new_cloud(),
493             arvados_client=self._new_arvados(),
494             node_monitor=node_actor.actor_ref, cancellable=cancellable)
495         record.shutdown_actor = shutdown.proxy()
496         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
497
498     @_check_poll_freshness
499     def node_can_shutdown(self, node_actor):
500         try:
501             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
502                 self._begin_node_shutdown(node_actor, cancellable=True)
503             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
504                 # Node is unpaired, which means it probably exceeded its booting
505                 # grace period without a ping, so shut it down so we can boot a new
506                 # node in its place.
507                 self._begin_node_shutdown(node_actor, cancellable=False)
508             elif node_actor.in_state('down', 'fail').get():
509                 # Node is down and unlikely to come back.
510                 self._begin_node_shutdown(node_actor, cancellable=False)
511         except pykka.ActorDeadError as e:
512             # The monitor actor sends shutdown suggestions every time the
513             # node's state is updated, and these go into the daemon actor's
514             # message queue.  It's possible that the node has already been shut
515             # down (which shuts down the node monitor actor).  In that case,
516             # this message is stale and we'll get ActorDeadError when we try to
517             # access node_actor.  Log the error.
518             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
519
520     def node_finished_shutdown(self, shutdown_actor):
521         try:
522             cloud_node, success = self._get_actor_attrs(
523                 shutdown_actor, 'cloud_node', 'success')
524         except pykka.ActorDeadError:
525             return
526         cloud_node_id = cloud_node.id
527
528         try:
529             shutdown_actor.stop()
530         except pykka.ActorDeadError:
531             pass
532
533         try:
534             record = self.cloud_nodes[cloud_node_id]
535         except KeyError:
536             # Cloud node was already removed from the cloud node list
537             # supposedly while the destroy_node call was finishing its
538             # job.
539             return
540         record.shutdown_actor = None
541
542         if not success:
543             return
544
545         # Shutdown was successful, so stop the monitor actor, otherwise it
546         # will keep offering the node as a candidate for shutdown.
547         record.actor.stop()
548         record.actor = None
549
550         # If the node went from being booted to being shut down without ever
551         # appearing in the cloud node list, it will have the
552         # _nodemanager_recently_booted tag, so get rid of it so that the node
553         # can be forgotten completely.
554         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
555             del record.cloud_node._nodemanager_recently_booted
556
557     def shutdown(self):
558         self._logger.info("Shutting down after signal.")
559         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
560
561         # Shut down pollers
562         self._server_wishlist_actor.stop()
563         self._arvados_nodes_actor.stop()
564         self._cloud_nodes_actor.stop()
565
566         # Clear cloud node list
567         self.update_cloud_nodes([])
568
569         # Stop setup actors unless they are in the middle of setup.
570         setup_stops = {key: node.stop_if_no_cloud_node()
571                        for key, node in self.booting.iteritems()}
572         self.booting = {key: self.booting[key]
573                         for key in setup_stops if not setup_stops[key].get()}
574         self._later.await_shutdown()
575
576     def await_shutdown(self):
577         if self.booting:
578             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
579         else:
580             self.stop()