12085: When an idle node disappears from the cloud node list, clear its counter.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11
12 import pykka
13
14 from . import computenode as cnode
15 from . import status
16 from .computenode import dispatch
17 from .config import actor_class
18
19 class _ComputeNodeRecord(object):
20     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21                  assignment_time=float('-inf')):
22         self.actor = actor
23         self.cloud_node = cloud_node
24         self.arvados_node = arvados_node
25         self.assignment_time = assignment_time
26         self.shutdown_actor = None
27
28 class _BaseNodeTracker(object):
29     def __init__(self):
30         self.nodes = {}
31         self.orphans = {}
32
33     # Proxy the methods listed below to self.nodes.
34     def _proxy_method(name):
35         method = getattr(dict, name)
36         @functools.wraps(method, ('__name__', '__doc__'))
37         def wrapper(self, *args, **kwargs):
38             return method(self.nodes, *args, **kwargs)
39         return wrapper
40
41     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42         locals()[_method_name] = _proxy_method(_method_name)
43
44     def record_key(self, record):
45         return self.item_key(getattr(record, self.RECORD_ATTR))
46
47     def add(self, record):
48         self.nodes[self.record_key(record)] = record
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in unseen:
58                 unseen.remove(key)
59                 self.update_record(key, item)
60             else:
61                 yield key, item
62         self.orphans = {key: self.nodes.pop(key) for key in unseen}
63
64     def unpaired(self):
65         return (record for record in self.nodes.itervalues()
66                 if getattr(record, self.PAIR_ATTR) is None)
67
68
69 class _CloudNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'cloud_node'
71     PAIR_ATTR = 'arvados_node'
72     item_key = staticmethod(lambda cloud_node: cloud_node.id)
73
74
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76     RECORD_ATTR = 'arvados_node'
77     PAIR_ATTR = 'cloud_node'
78     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
79
80     def find_stale_node(self, stale_time):
81         # Try to select a stale node record that have an assigned slot first
82         for record in sorted(self.nodes.itervalues(),
83                              key=lambda r: r.arvados_node['slot_number'],
84                              reverse=True):
85             node = record.arvados_node
86             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87                                           stale_time) and
88                   not cnode.timestamp_fresh(record.assignment_time,
89                                             stale_time)):
90                 return node
91         return None
92
93
94 class NodeManagerDaemonActor(actor_class):
95     """Node Manager daemon.
96
97     This actor subscribes to all information polls about cloud nodes,
98     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
99     for every cloud node, subscribing them to poll updates
100     appropriately.  It creates and destroys cloud nodes based on job queue
101     demand, and stops the corresponding ComputeNode actors when their work
102     is done.
103     """
104     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105                  cloud_nodes_actor, cloud_update_actor, timer_actor,
106                  arvados_factory, cloud_factory,
107                  shutdown_windows, server_calculator,
108                  min_nodes, max_nodes,
109                  poll_stale_after=600,
110                  boot_fail_after=1800,
111                  node_stale_after=7200,
112                  node_setup_class=dispatch.ComputeNodeSetupActor,
113                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114                  node_actor_class=dispatch.ComputeNodeMonitorActor,
115                  max_total_price=0):
116         super(NodeManagerDaemonActor, self).__init__()
117         self._node_setup = node_setup_class
118         self._node_shutdown = node_shutdown_class
119         self._node_actor = node_actor_class
120         self._cloud_updater = cloud_update_actor
121         self._timer = timer_actor
122         self._new_arvados = arvados_factory
123         self._new_cloud = cloud_factory
124         self._cloud_driver = self._new_cloud()
125         self._later = self.actor_ref.tell_proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.node_quota = max_nodes
132         self.max_total_price = max_total_price
133         self.poll_stale_after = poll_stale_after
134         self.boot_fail_after = boot_fail_after
135         self.node_stale_after = node_stale_after
136         self.last_polls = {}
137         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
138             poll_actor = locals()[poll_name + '_actor']
139             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
140             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
141             self.last_polls[poll_name] = -self.poll_stale_after
142         self.cloud_nodes = _CloudNodeTracker()
143         self.arvados_nodes = _ArvadosNodeTracker()
144         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
145         self.sizes_booting = {} # Actor IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _pair_nodes(self, node_record, arvados_node):
155         self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
156                           node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
157         self._arvados_nodes_actor.subscribe_to(
158             arvados_node['uuid'], node_record.actor.update_arvados_node)
159         node_record.arvados_node = arvados_node
160         self.arvados_nodes.add(node_record)
161
162     def _new_node(self, cloud_node):
163         start_time = self._cloud_driver.node_start_time(cloud_node)
164         shutdown_timer = cnode.ShutdownTimer(start_time,
165                                              self.shutdown_windows)
166         actor = self._node_actor.start(
167             cloud_node=cloud_node,
168             cloud_node_start_time=start_time,
169             shutdown_timer=shutdown_timer,
170             update_actor=self._cloud_updater,
171             timer_actor=self._timer,
172             arvados_node=None,
173             poll_stale_after=self.poll_stale_after,
174             node_stale_after=self.node_stale_after,
175             cloud_client=self._cloud_driver,
176             boot_fail_after=self.boot_fail_after)
177         actorTell = actor.tell_proxy()
178         actorTell.subscribe(self._later.node_can_shutdown)
179         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
180                                              actorTell.update_cloud_node)
181         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
182         return record
183
184     def _register_cloud_node(self, node):
185         rec = self.cloud_nodes.get(node.id)
186         if rec is None:
187             self._logger.info("Registering new cloud node %s", node.id)
188             record = self._new_node(node)
189             self.cloud_nodes.add(record)
190         else:
191             rec.cloud_node = node
192
193     def update_cloud_nodes(self, nodelist):
194         self._update_poll_time('cloud_nodes')
195         for _, node in self.cloud_nodes.update_from(nodelist):
196             self._register_cloud_node(node)
197
198         self.try_pairing()
199
200         for record in self.cloud_nodes.orphans.itervalues():
201             if record.shutdown_actor:
202                 try:
203                     record.shutdown_actor.stop()
204                 except pykka.ActorDeadError:
205                     pass
206                 record.shutdown_actor = None
207
208             # A recently booted node is a node that successfully completed the
209             # setup actor but has not yet appeared in the cloud node list.
210             # This will have the tag _nodemanager_recently_booted on it, which
211             # means (if we're not shutting it down) we want to put it back into
212             # the cloud node list.  Once it really appears in the cloud list,
213             # the object in record.cloud_node will be replaced by a new one
214             # that lacks the "_nodemanager_recently_booted" tag.
215             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
216                 self.cloud_nodes.add(record)
217             else:
218                 # Node disappeared from the cloud node list.  Stop the monitor
219                 # actor if necessary and forget about the node.
220                 if record.actor:
221                     try:
222                         # If it's paired and idle, stop its idle time counter
223                         # before removing the monitor actor.
224                         if record.actor.get_state().get() == 'idle':
225                             status.tracker.idle_out(
226                                 record.actor.arvados_node.get()['hostname'])
227                         record.actor.stop()
228                     except pykka.ActorDeadError:
229                         pass
230                     record.actor = None
231                 record.cloud_node = None
232
233     def _register_arvados_node(self, key, arv_node):
234         self._logger.info("Registering new Arvados node %s", key)
235         record = _ComputeNodeRecord(arvados_node=arv_node)
236         self.arvados_nodes.add(record)
237
238     def update_arvados_nodes(self, nodelist):
239         self._update_poll_time('arvados_nodes')
240         for key, node in self.arvados_nodes.update_from(nodelist):
241             self._register_arvados_node(key, node)
242         self.try_pairing()
243
244     def try_pairing(self):
245         for record in self.cloud_nodes.unpaired():
246             for arv_rec in self.arvados_nodes.unpaired():
247                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
248                     self._pair_nodes(record, arv_rec.arvados_node)
249                     break
250
251     def _nodes_booting(self, size):
252         s = sum(1
253                 for c in self.booting.iterkeys()
254                 if size is None or self.sizes_booting[c].id == size.id)
255         return s
256
257     def _node_states(self, size):
258         proxy_states = []
259         states = []
260         for rec in self.cloud_nodes.nodes.itervalues():
261             if size is None or rec.cloud_node.size.id == size.id:
262                 if rec.shutdown_actor is None and rec.actor is not None:
263                     proxy_states.append(rec.actor.get_state())
264                 else:
265                     states.append("shutdown")
266         return states + pykka.get_all(proxy_states)
267
268     def _update_tracker(self):
269         updates = {
270             k: 0
271             for k in status.tracker.keys()
272             if k.startswith('nodes_')
273         }
274         for s in self._node_states(size=None):
275             updates.setdefault('nodes_'+s, 0)
276             updates['nodes_'+s] += 1
277         updates['nodes_wish'] = len(self.last_wishlist)
278         updates['node_quota'] = self.node_quota
279         status.tracker.update(updates)
280
281     def _state_counts(self, size):
282         states = self._node_states(size)
283         counts = {
284             "booting": self._nodes_booting(size),
285             "unpaired": 0,
286             "busy": 0,
287             "idle": 0,
288             "fail": 0,
289             "down": 0,
290             "shutdown": 0
291         }
292         for s in states:
293             counts[s] = counts[s] + 1
294         return counts
295
296     def _nodes_up(self, counts):
297         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
298         return up
299
300     def _total_price(self):
301         cost = 0
302         cost += sum(self.sizes_booting[c].price
303                     for c in self.booting.iterkeys())
304         cost += sum(c.cloud_node.size.price
305                     for c in self.cloud_nodes.nodes.itervalues())
306         return cost
307
308     def _size_wishlist(self, size):
309         return sum(1 for c in self.last_wishlist if c.id == size.id)
310
311     def _nodes_wanted(self, size):
312         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
313         under_min = self.min_nodes - total_node_count
314         over_max = total_node_count - self.node_quota
315         total_price = self._total_price()
316
317         counts = self._state_counts(size)
318
319         up_count = self._nodes_up(counts)
320         busy_count = counts["busy"]
321         wishlist_count = self._size_wishlist(size)
322
323         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
324                           wishlist_count,
325                           up_count,
326                           counts["booting"],
327                           counts["unpaired"],
328                           counts["idle"],
329                           busy_count,
330                           counts["down"]+counts["fail"],
331                           counts["shutdown"])
332
333         if over_max >= 0:
334             return -over_max
335         elif under_min > 0 and size.id == self.min_cloud_size.id:
336             return under_min
337
338         wanted = wishlist_count - (up_count - busy_count)
339         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
340             can_boot = int((self.max_total_price - total_price) / size.price)
341             if can_boot == 0:
342                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
343                                   size.name, size.price, self.max_total_price, total_price)
344             return can_boot
345         else:
346             return wanted
347
348     def _nodes_excess(self, size):
349         counts = self._state_counts(size)
350         up_count = self._nodes_up(counts)
351         if size.id == self.min_cloud_size.id:
352             up_count -= self.min_nodes
353         return up_count - (counts["busy"] + self._size_wishlist(size))
354
355     def update_server_wishlist(self, wishlist):
356         self._update_poll_time('server_wishlist')
357         requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
358         self.last_wishlist = wishlist[:requestable_nodes]
359         for size in reversed(self.server_calculator.cloud_sizes):
360             try:
361                 nodes_wanted = self._nodes_wanted(size)
362                 if nodes_wanted > 0:
363                     self._later.start_node(size)
364                 elif (nodes_wanted < 0) and self.booting:
365                     self._later.stop_booting_node(size)
366             except Exception:
367                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
368         try:
369             self._update_tracker()
370         except:
371             self._logger.exception("while updating tracker")
372
373     def _check_poll_freshness(orig_func):
374         """Decorator to inhibit a method when poll information is stale.
375
376         This decorator checks the timestamps of all the poll information the
377         daemon has received.  The decorated method is only called if none
378         of the timestamps are considered stale.
379         """
380         @functools.wraps(orig_func)
381         def wrapper(self, *args, **kwargs):
382             now = time.time()
383             if all(now - t < self.poll_stale_after
384                    for t in self.last_polls.itervalues()):
385                 return orig_func(self, *args, **kwargs)
386             else:
387                 return None
388         return wrapper
389
390     @_check_poll_freshness
391     def start_node(self, cloud_size):
392         nodes_wanted = self._nodes_wanted(cloud_size)
393         if nodes_wanted < 1:
394             return None
395         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
396         self._logger.info("Want %i more %s nodes.  Booting a node.",
397                           nodes_wanted, cloud_size.name)
398         new_setup = self._node_setup.start(
399             timer_actor=self._timer,
400             arvados_client=self._new_arvados(),
401             arvados_node=arvados_node,
402             cloud_client=self._new_cloud(),
403             cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
404         self.booting[new_setup.actor_ref.actor_urn] = new_setup
405         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
406
407         if arvados_node is not None:
408             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
409                 time.time())
410         new_setup.subscribe(self._later.node_setup_finished)
411         if nodes_wanted > 1:
412             self._later.start_node(cloud_size)
413
414     def _get_actor_attrs(self, actor, *attr_names):
415         return pykka.get_all([getattr(actor, name) for name in attr_names])
416
417     def node_setup_finished(self, setup_proxy):
418         # Called when a SetupActor has completed.
419         cloud_node, arvados_node, error = self._get_actor_attrs(
420             setup_proxy, 'cloud_node', 'arvados_node', 'error')
421         setup_proxy.stop()
422
423         if cloud_node is None:
424             # If cloud_node is None then the node create wasn't successful.
425             if error == dispatch.QuotaExceeded:
426                 # We've hit a quota limit, so adjust node_quota to stop trying to
427                 # boot new nodes until the node count goes down.
428                 self.node_quota = len(self.cloud_nodes)
429                 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
430         else:
431             # Node creation succeeded.  Update cloud node list.
432             cloud_node._nodemanager_recently_booted = True
433             self._register_cloud_node(cloud_node)
434
435             # Different quota policies may in force depending on the cloud
436             # provider, account limits, and the specific mix of nodes sizes
437             # that are already created.  If we are right at the quota limit,
438             # we want to probe to see if the last quota still applies or if we
439             # are allowed to create more nodes.
440             #
441             # For example, if the quota is actually based on core count, the
442             # quota might be 20 single-core machines or 10 dual-core machines.
443             # If we previously set node_quota to 10 dual core machines, but are
444             # now booting single core machines (actual quota 20), we want to
445             # allow the quota to expand so we don't get stuck at 10 machines
446             # forever.
447             if len(self.cloud_nodes) >= self.node_quota:
448                 self.node_quota = len(self.cloud_nodes)+1
449                 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
450
451         self.node_quota = min(self.node_quota, self.max_nodes)
452         del self.booting[setup_proxy.actor_ref.actor_urn]
453         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
454
455     @_check_poll_freshness
456     def stop_booting_node(self, size):
457         nodes_excess = self._nodes_excess(size)
458         if (nodes_excess < 1) or not self.booting:
459             return None
460         for key, node in self.booting.iteritems():
461             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
462                 del self.booting[key]
463                 del self.sizes_booting[key]
464
465                 if nodes_excess > 1:
466                     self._later.stop_booting_node(size)
467                 break
468
469     def _begin_node_shutdown(self, node_actor, cancellable):
470         cloud_node_obj = node_actor.cloud_node.get()
471         cloud_node_id = cloud_node_obj.id
472         record = self.cloud_nodes[cloud_node_id]
473         if record.shutdown_actor is not None:
474             return None
475         shutdown = self._node_shutdown.start(
476             timer_actor=self._timer, cloud_client=self._new_cloud(),
477             arvados_client=self._new_arvados(),
478             node_monitor=node_actor.actor_ref, cancellable=cancellable)
479         record.shutdown_actor = shutdown.proxy()
480         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
481
482     @_check_poll_freshness
483     def node_can_shutdown(self, node_actor):
484         try:
485             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
486                 self._begin_node_shutdown(node_actor, cancellable=True)
487             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
488                 # Node is unpaired, which means it probably exceeded its booting
489                 # grace period without a ping, so shut it down so we can boot a new
490                 # node in its place.
491                 self._begin_node_shutdown(node_actor, cancellable=False)
492             elif node_actor.in_state('down', 'fail').get():
493                 # Node is down and unlikely to come back.
494                 self._begin_node_shutdown(node_actor, cancellable=False)
495         except pykka.ActorDeadError as e:
496             # The monitor actor sends shutdown suggestions every time the
497             # node's state is updated, and these go into the daemon actor's
498             # message queue.  It's possible that the node has already been shut
499             # down (which shuts down the node monitor actor).  In that case,
500             # this message is stale and we'll get ActorDeadError when we try to
501             # access node_actor.  Log the error.
502             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
503
504     def node_finished_shutdown(self, shutdown_actor):
505         try:
506             cloud_node, success = self._get_actor_attrs(
507                 shutdown_actor, 'cloud_node', 'success')
508         except pykka.ActorDeadError:
509             return
510         cloud_node_id = cloud_node.id
511
512         try:
513             shutdown_actor.stop()
514         except pykka.ActorDeadError:
515             pass
516
517         try:
518             record = self.cloud_nodes[cloud_node_id]
519         except KeyError:
520             # Cloud node was already removed from the cloud node list
521             # supposedly while the destroy_node call was finishing its
522             # job.
523             return
524         record.shutdown_actor = None
525
526         if not success:
527             return
528
529         # Shutdown was successful, so stop the monitor actor, otherwise it
530         # will keep offering the node as a candidate for shutdown.
531         record.actor.stop()
532         record.actor = None
533
534         # If the node went from being booted to being shut down without ever
535         # appearing in the cloud node list, it will have the
536         # _nodemanager_recently_booted tag, so get rid of it so that the node
537         # can be forgotten completely.
538         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
539             del record.cloud_node._nodemanager_recently_booted
540
541     def shutdown(self):
542         self._logger.info("Shutting down after signal.")
543         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
544
545         # Shut down pollers
546         self._server_wishlist_actor.stop()
547         self._arvados_nodes_actor.stop()
548         self._cloud_nodes_actor.stop()
549
550         # Clear cloud node list
551         self.update_cloud_nodes([])
552
553         # Stop setup actors unless they are in the middle of setup.
554         setup_stops = {key: node.stop_if_no_cloud_node()
555                        for key, node in self.booting.iteritems()}
556         self.booting = {key: self.booting[key]
557                         for key in setup_stops if not setup_stops[key].get()}
558         self._later.await_shutdown()
559
560     def await_shutdown(self):
561         if self.booting:
562             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
563         else:
564             self.stop()