12085: Add node_quota status & count different cloud errors separately.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11
12 import pykka
13
14 from . import computenode as cnode
15 from . import status
16 from .computenode import dispatch
17 from .config import actor_class
18
19 class _ComputeNodeRecord(object):
20     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21                  assignment_time=float('-inf')):
22         self.actor = actor
23         self.cloud_node = cloud_node
24         self.arvados_node = arvados_node
25         self.assignment_time = assignment_time
26         self.shutdown_actor = None
27
28 class _BaseNodeTracker(object):
29     def __init__(self):
30         self.nodes = {}
31         self.orphans = {}
32
33     # Proxy the methods listed below to self.nodes.
34     def _proxy_method(name):
35         method = getattr(dict, name)
36         @functools.wraps(method, ('__name__', '__doc__'))
37         def wrapper(self, *args, **kwargs):
38             return method(self.nodes, *args, **kwargs)
39         return wrapper
40
41     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42         locals()[_method_name] = _proxy_method(_method_name)
43
44     def record_key(self, record):
45         return self.item_key(getattr(record, self.RECORD_ATTR))
46
47     def add(self, record):
48         self.nodes[self.record_key(record)] = record
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in unseen:
58                 unseen.remove(key)
59                 self.update_record(key, item)
60             else:
61                 yield key, item
62         self.orphans = {key: self.nodes.pop(key) for key in unseen}
63
64     def unpaired(self):
65         return (record for record in self.nodes.itervalues()
66                 if getattr(record, self.PAIR_ATTR) is None)
67
68
69 class _CloudNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'cloud_node'
71     PAIR_ATTR = 'arvados_node'
72     item_key = staticmethod(lambda cloud_node: cloud_node.id)
73
74
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76     RECORD_ATTR = 'arvados_node'
77     PAIR_ATTR = 'cloud_node'
78     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
79
80     def find_stale_node(self, stale_time):
81         # Try to select a stale node record that have an assigned slot first
82         for record in sorted(self.nodes.itervalues(),
83                              key=lambda r: r.arvados_node['slot_number'],
84                              reverse=True):
85             node = record.arvados_node
86             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87                                           stale_time) and
88                   not cnode.timestamp_fresh(record.assignment_time,
89                                             stale_time)):
90                 return node
91         return None
92
93
94 class NodeManagerDaemonActor(actor_class):
95     """Node Manager daemon.
96
97     This actor subscribes to all information polls about cloud nodes,
98     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
99     for every cloud node, subscribing them to poll updates
100     appropriately.  It creates and destroys cloud nodes based on job queue
101     demand, and stops the corresponding ComputeNode actors when their work
102     is done.
103     """
104     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105                  cloud_nodes_actor, cloud_update_actor, timer_actor,
106                  arvados_factory, cloud_factory,
107                  shutdown_windows, server_calculator,
108                  min_nodes, max_nodes,
109                  poll_stale_after=600,
110                  boot_fail_after=1800,
111                  node_stale_after=7200,
112                  node_setup_class=dispatch.ComputeNodeSetupActor,
113                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114                  node_actor_class=dispatch.ComputeNodeMonitorActor,
115                  max_total_price=0):
116         super(NodeManagerDaemonActor, self).__init__()
117         self._node_setup = node_setup_class
118         self._node_shutdown = node_shutdown_class
119         self._node_actor = node_actor_class
120         self._cloud_updater = cloud_update_actor
121         self._timer = timer_actor
122         self._new_arvados = arvados_factory
123         self._new_cloud = cloud_factory
124         self._cloud_driver = self._new_cloud()
125         self._later = self.actor_ref.tell_proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.node_quota = max_nodes
132         self.max_total_price = max_total_price
133         self.poll_stale_after = poll_stale_after
134         self.boot_fail_after = boot_fail_after
135         self.node_stale_after = node_stale_after
136         self.last_polls = {}
137         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
138             poll_actor = locals()[poll_name + '_actor']
139             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
140             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
141             self.last_polls[poll_name] = -self.poll_stale_after
142         self.cloud_nodes = _CloudNodeTracker()
143         self.arvados_nodes = _ArvadosNodeTracker()
144         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
145         self.sizes_booting = {} # Actor IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _pair_nodes(self, node_record, arvados_node):
155         self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
156                           node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
157         self._arvados_nodes_actor.subscribe_to(
158             arvados_node['uuid'], node_record.actor.update_arvados_node)
159         node_record.arvados_node = arvados_node
160         self.arvados_nodes.add(node_record)
161
162     def _new_node(self, cloud_node):
163         start_time = self._cloud_driver.node_start_time(cloud_node)
164         shutdown_timer = cnode.ShutdownTimer(start_time,
165                                              self.shutdown_windows)
166         actor = self._node_actor.start(
167             cloud_node=cloud_node,
168             cloud_node_start_time=start_time,
169             shutdown_timer=shutdown_timer,
170             update_actor=self._cloud_updater,
171             timer_actor=self._timer,
172             arvados_node=None,
173             poll_stale_after=self.poll_stale_after,
174             node_stale_after=self.node_stale_after,
175             cloud_client=self._cloud_driver,
176             boot_fail_after=self.boot_fail_after)
177         actorTell = actor.tell_proxy()
178         actorTell.subscribe(self._later.node_can_shutdown)
179         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
180                                              actorTell.update_cloud_node)
181         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
182         return record
183
184     def _register_cloud_node(self, node):
185         rec = self.cloud_nodes.get(node.id)
186         if rec is None:
187             self._logger.info("Registering new cloud node %s", node.id)
188             record = self._new_node(node)
189             self.cloud_nodes.add(record)
190         else:
191             rec.cloud_node = node
192
193     def update_cloud_nodes(self, nodelist):
194         self._update_poll_time('cloud_nodes')
195         for _, node in self.cloud_nodes.update_from(nodelist):
196             self._register_cloud_node(node)
197
198         self.try_pairing()
199
200         for record in self.cloud_nodes.orphans.itervalues():
201             if record.shutdown_actor:
202                 try:
203                     record.shutdown_actor.stop()
204                 except pykka.ActorDeadError:
205                     pass
206                 record.shutdown_actor = None
207
208             # A recently booted node is a node that successfully completed the
209             # setup actor but has not yet appeared in the cloud node list.
210             # This will have the tag _nodemanager_recently_booted on it, which
211             # means (if we're not shutting it down) we want to put it back into
212             # the cloud node list.  Once it really appears in the cloud list,
213             # the object in record.cloud_node will be replaced by a new one
214             # that lacks the "_nodemanager_recently_booted" tag.
215             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
216                 self.cloud_nodes.add(record)
217             else:
218                 # Node disappeared from the cloud node list.  Stop the monitor
219                 # actor if necessary and forget about the node.
220                 if record.actor:
221                     try:
222                         record.actor.stop()
223                     except pykka.ActorDeadError:
224                         pass
225                     record.actor = None
226                 record.cloud_node = None
227
228     def _register_arvados_node(self, key, arv_node):
229         self._logger.info("Registering new Arvados node %s", key)
230         record = _ComputeNodeRecord(arvados_node=arv_node)
231         self.arvados_nodes.add(record)
232
233     def update_arvados_nodes(self, nodelist):
234         self._update_poll_time('arvados_nodes')
235         for key, node in self.arvados_nodes.update_from(nodelist):
236             self._register_arvados_node(key, node)
237         self.try_pairing()
238
239     def try_pairing(self):
240         for record in self.cloud_nodes.unpaired():
241             for arv_rec in self.arvados_nodes.unpaired():
242                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
243                     self._pair_nodes(record, arv_rec.arvados_node)
244                     break
245
246     def _nodes_booting(self, size):
247         s = sum(1
248                 for c in self.booting.iterkeys()
249                 if size is None or self.sizes_booting[c].id == size.id)
250         return s
251
252     def _node_states(self, size):
253         proxy_states = []
254         states = []
255         for rec in self.cloud_nodes.nodes.itervalues():
256             if size is None or rec.cloud_node.size.id == size.id:
257                 if rec.shutdown_actor is None and rec.actor is not None:
258                     proxy_states.append(rec.actor.get_state())
259                 else:
260                     states.append("shutdown")
261         return states + pykka.get_all(proxy_states)
262
263     def _update_tracker(self):
264         updates = {
265             k: 0
266             for k in status.tracker.keys()
267             if k.startswith('nodes_')
268         }
269         for s in self._node_states(size=None):
270             updates.setdefault('nodes_'+s, 0)
271             updates['nodes_'+s] += 1
272         updates['nodes_wish'] = len(self.last_wishlist)
273         updates['node_quota'] = self.node_quota
274         status.tracker.update(updates)
275
276     def _state_counts(self, size):
277         states = self._node_states(size)
278         counts = {
279             "booting": self._nodes_booting(size),
280             "unpaired": 0,
281             "busy": 0,
282             "idle": 0,
283             "fail": 0,
284             "down": 0,
285             "shutdown": 0
286         }
287         for s in states:
288             counts[s] = counts[s] + 1
289         return counts
290
291     def _nodes_up(self, counts):
292         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
293         return up
294
295     def _total_price(self):
296         cost = 0
297         cost += sum(self.sizes_booting[c].price
298                     for c in self.booting.iterkeys())
299         cost += sum(c.cloud_node.size.price
300                     for c in self.cloud_nodes.nodes.itervalues())
301         return cost
302
303     def _size_wishlist(self, size):
304         return sum(1 for c in self.last_wishlist if c.id == size.id)
305
306     def _nodes_wanted(self, size):
307         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
308         under_min = self.min_nodes - total_node_count
309         over_max = total_node_count - self.node_quota
310         total_price = self._total_price()
311
312         counts = self._state_counts(size)
313
314         up_count = self._nodes_up(counts)
315         busy_count = counts["busy"]
316         wishlist_count = self._size_wishlist(size)
317
318         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
319                           wishlist_count,
320                           up_count,
321                           counts["booting"],
322                           counts["unpaired"],
323                           counts["idle"],
324                           busy_count,
325                           counts["down"]+counts["fail"],
326                           counts["shutdown"])
327
328         if over_max >= 0:
329             return -over_max
330         elif under_min > 0 and size.id == self.min_cloud_size.id:
331             return under_min
332
333         wanted = wishlist_count - (up_count - busy_count)
334         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
335             can_boot = int((self.max_total_price - total_price) / size.price)
336             if can_boot == 0:
337                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
338                                   size.name, size.price, self.max_total_price, total_price)
339             return can_boot
340         else:
341             return wanted
342
343     def _nodes_excess(self, size):
344         counts = self._state_counts(size)
345         up_count = self._nodes_up(counts)
346         if size.id == self.min_cloud_size.id:
347             up_count -= self.min_nodes
348         return up_count - (counts["busy"] + self._size_wishlist(size))
349
350     def update_server_wishlist(self, wishlist):
351         self._update_poll_time('server_wishlist')
352         requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
353         self.last_wishlist = wishlist[:requestable_nodes]
354         for size in reversed(self.server_calculator.cloud_sizes):
355             try:
356                 nodes_wanted = self._nodes_wanted(size)
357                 if nodes_wanted > 0:
358                     self._later.start_node(size)
359                 elif (nodes_wanted < 0) and self.booting:
360                     self._later.stop_booting_node(size)
361             except Exception:
362                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
363         try:
364             self._update_tracker()
365         except:
366             self._logger.exception("while updating tracker")
367
368     def _check_poll_freshness(orig_func):
369         """Decorator to inhibit a method when poll information is stale.
370
371         This decorator checks the timestamps of all the poll information the
372         daemon has received.  The decorated method is only called if none
373         of the timestamps are considered stale.
374         """
375         @functools.wraps(orig_func)
376         def wrapper(self, *args, **kwargs):
377             now = time.time()
378             if all(now - t < self.poll_stale_after
379                    for t in self.last_polls.itervalues()):
380                 return orig_func(self, *args, **kwargs)
381             else:
382                 return None
383         return wrapper
384
385     @_check_poll_freshness
386     def start_node(self, cloud_size):
387         nodes_wanted = self._nodes_wanted(cloud_size)
388         if nodes_wanted < 1:
389             return None
390         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
391         self._logger.info("Want %i more %s nodes.  Booting a node.",
392                           nodes_wanted, cloud_size.name)
393         new_setup = self._node_setup.start(
394             timer_actor=self._timer,
395             arvados_client=self._new_arvados(),
396             arvados_node=arvados_node,
397             cloud_client=self._new_cloud(),
398             cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
399         self.booting[new_setup.actor_ref.actor_urn] = new_setup
400         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
401
402         if arvados_node is not None:
403             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
404                 time.time())
405         new_setup.subscribe(self._later.node_setup_finished)
406         if nodes_wanted > 1:
407             self._later.start_node(cloud_size)
408
409     def _get_actor_attrs(self, actor, *attr_names):
410         return pykka.get_all([getattr(actor, name) for name in attr_names])
411
412     def node_setup_finished(self, setup_proxy):
413         # Called when a SetupActor has completed.
414         cloud_node, arvados_node, error = self._get_actor_attrs(
415             setup_proxy, 'cloud_node', 'arvados_node', 'error')
416         setup_proxy.stop()
417
418         if cloud_node is None:
419             # If cloud_node is None then the node create wasn't successful.
420             if error == dispatch.QuotaExceeded:
421                 # We've hit a quota limit, so adjust node_quota to stop trying to
422                 # boot new nodes until the node count goes down.
423                 self.node_quota = len(self.cloud_nodes)
424                 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
425         else:
426             # Node creation succeeded.  Update cloud node list.
427             cloud_node._nodemanager_recently_booted = True
428             self._register_cloud_node(cloud_node)
429
430             # Different quota policies may in force depending on the cloud
431             # provider, account limits, and the specific mix of nodes sizes
432             # that are already created.  If we are right at the quota limit,
433             # we want to probe to see if the last quota still applies or if we
434             # are allowed to create more nodes.
435             #
436             # For example, if the quota is actually based on core count, the
437             # quota might be 20 single-core machines or 10 dual-core machines.
438             # If we previously set node_quota to 10 dual core machines, but are
439             # now booting single core machines (actual quota 20), we want to
440             # allow the quota to expand so we don't get stuck at 10 machines
441             # forever.
442             if len(self.cloud_nodes) >= self.node_quota:
443                 self.node_quota = len(self.cloud_nodes)+1
444                 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
445
446         self.node_quota = min(self.node_quota, self.max_nodes)
447         del self.booting[setup_proxy.actor_ref.actor_urn]
448         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
449
450     @_check_poll_freshness
451     def stop_booting_node(self, size):
452         nodes_excess = self._nodes_excess(size)
453         if (nodes_excess < 1) or not self.booting:
454             return None
455         for key, node in self.booting.iteritems():
456             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
457                 del self.booting[key]
458                 del self.sizes_booting[key]
459
460                 if nodes_excess > 1:
461                     self._later.stop_booting_node(size)
462                 break
463
464     def _begin_node_shutdown(self, node_actor, cancellable):
465         cloud_node_obj = node_actor.cloud_node.get()
466         cloud_node_id = cloud_node_obj.id
467         record = self.cloud_nodes[cloud_node_id]
468         if record.shutdown_actor is not None:
469             return None
470         shutdown = self._node_shutdown.start(
471             timer_actor=self._timer, cloud_client=self._new_cloud(),
472             arvados_client=self._new_arvados(),
473             node_monitor=node_actor.actor_ref, cancellable=cancellable)
474         record.shutdown_actor = shutdown.proxy()
475         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
476
477     @_check_poll_freshness
478     def node_can_shutdown(self, node_actor):
479         try:
480             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
481                 self._begin_node_shutdown(node_actor, cancellable=True)
482             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
483                 # Node is unpaired, which means it probably exceeded its booting
484                 # grace period without a ping, so shut it down so we can boot a new
485                 # node in its place.
486                 self._begin_node_shutdown(node_actor, cancellable=False)
487             elif node_actor.in_state('down', 'fail').get():
488                 # Node is down and unlikely to come back.
489                 self._begin_node_shutdown(node_actor, cancellable=False)
490         except pykka.ActorDeadError as e:
491             # The monitor actor sends shutdown suggestions every time the
492             # node's state is updated, and these go into the daemon actor's
493             # message queue.  It's possible that the node has already been shut
494             # down (which shuts down the node monitor actor).  In that case,
495             # this message is stale and we'll get ActorDeadError when we try to
496             # access node_actor.  Log the error.
497             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
498
499     def node_finished_shutdown(self, shutdown_actor):
500         try:
501             cloud_node, success = self._get_actor_attrs(
502                 shutdown_actor, 'cloud_node', 'success')
503         except pykka.ActorDeadError:
504             return
505         cloud_node_id = cloud_node.id
506
507         try:
508             shutdown_actor.stop()
509         except pykka.ActorDeadError:
510             pass
511
512         try:
513             record = self.cloud_nodes[cloud_node_id]
514         except KeyError:
515             # Cloud node was already removed from the cloud node list
516             # supposedly while the destroy_node call was finishing its
517             # job.
518             return
519         record.shutdown_actor = None
520
521         if not success:
522             return
523
524         # Shutdown was successful, so stop the monitor actor, otherwise it
525         # will keep offering the node as a candidate for shutdown.
526         record.actor.stop()
527         record.actor = None
528
529         # If the node went from being booted to being shut down without ever
530         # appearing in the cloud node list, it will have the
531         # _nodemanager_recently_booted tag, so get rid of it so that the node
532         # can be forgotten completely.
533         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
534             del record.cloud_node._nodemanager_recently_booted
535
536     def shutdown(self):
537         self._logger.info("Shutting down after signal.")
538         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
539
540         # Shut down pollers
541         self._server_wishlist_actor.stop()
542         self._arvados_nodes_actor.stop()
543         self._cloud_nodes_actor.stop()
544
545         # Clear cloud node list
546         self.update_cloud_nodes([])
547
548         # Stop setup actors unless they are in the middle of setup.
549         setup_stops = {key: node.stop_if_no_cloud_node()
550                        for key, node in self.booting.iteritems()}
551         self.booting = {key: self.booting[key]
552                         for key in setup_stops if not setup_stops[key].get()}
553         self._later.await_shutdown()
554
555     def await_shutdown(self):
556         if self.booting:
557             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
558         else:
559             self.stop()