15003: Fix import cycle.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11
12 import pykka
13
14 from . import computenode as cnode
15 from . import status
16 from .computenode import dispatch
17 from .config import actor_class
18
19 class _ComputeNodeRecord(object):
20     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21                  assignment_time=float('-inf')):
22         self.actor = actor
23         self.cloud_node = cloud_node
24         self.arvados_node = arvados_node
25         self.assignment_time = assignment_time
26         self.shutdown_actor = None
27
28 class _BaseNodeTracker(object):
29     def __init__(self):
30         self.nodes = {}
31         self.orphans = {}
32
33     # Proxy the methods listed below to self.nodes.
34     def _proxy_method(name):
35         method = getattr(dict, name)
36         @functools.wraps(method, ('__name__', '__doc__'))
37         def wrapper(self, *args, **kwargs):
38             return method(self.nodes, *args, **kwargs)
39         return wrapper
40
41     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42         locals()[_method_name] = _proxy_method(_method_name)
43
44     def record_key(self, record):
45         return self.item_key(getattr(record, self.RECORD_ATTR))
46
47     def add(self, record):
48         self.nodes[self.record_key(record)] = record
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in unseen:
58                 unseen.remove(key)
59                 self.update_record(key, item)
60             else:
61                 yield key, item
62         self.orphans = {key: self.nodes.pop(key) for key in unseen}
63
64     def unpaired(self):
65         return (record for record in self.nodes.itervalues()
66                 if getattr(record, self.PAIR_ATTR) is None)
67
68
69 class _CloudNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'cloud_node'
71     PAIR_ATTR = 'arvados_node'
72     item_key = staticmethod(lambda cloud_node: cloud_node.id)
73
74
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76     RECORD_ATTR = 'arvados_node'
77     PAIR_ATTR = 'cloud_node'
78     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
79
80     def find_stale_node(self, stale_time):
81         # Try to select a stale node record that have an assigned slot first
82         for record in sorted(self.nodes.itervalues(),
83                              key=lambda r: r.arvados_node['slot_number'],
84                              reverse=True):
85             node = record.arvados_node
86             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87                                           stale_time) and
88                   not cnode.timestamp_fresh(record.assignment_time,
89                                             stale_time)):
90                 return node
91         return None
92
93
94 class NodeManagerDaemonActor(actor_class):
95     """Node Manager daemon.
96
97     This actor subscribes to all information polls about cloud nodes,
98     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
99     for every cloud node, subscribing them to poll updates
100     appropriately.  It creates and destroys cloud nodes based on job queue
101     demand, and stops the corresponding ComputeNode actors when their work
102     is done.
103     """
104     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105                  cloud_nodes_actor, cloud_update_actor, timer_actor,
106                  arvados_factory, cloud_factory,
107                  shutdown_windows, server_calculator,
108                  min_nodes, max_nodes,
109                  poll_stale_after=600,
110                  boot_fail_after=1800,
111                  node_stale_after=7200,
112                  node_setup_class=dispatch.ComputeNodeSetupActor,
113                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114                  node_actor_class=dispatch.ComputeNodeMonitorActor,
115                  max_total_price=0,
116                  consecutive_idle_count=1):
117         super(NodeManagerDaemonActor, self).__init__()
118         self._node_setup = node_setup_class
119         self._node_shutdown = node_shutdown_class
120         self._node_actor = node_actor_class
121         self._cloud_updater = cloud_update_actor
122         self._timer = timer_actor
123         self._new_arvados = arvados_factory
124         self._new_cloud = cloud_factory
125         self._cloud_driver = self._new_cloud()
126         self._later = self.actor_ref.tell_proxy()
127         self.shutdown_windows = shutdown_windows
128         self.server_calculator = server_calculator
129         self.min_cloud_size = self.server_calculator.cheapest_size()
130         self.min_nodes = min_nodes
131         self.max_nodes = max_nodes
132         self.node_quota = max_nodes
133         self.max_total_price = max_total_price
134         self.poll_stale_after = poll_stale_after
135         self.boot_fail_after = boot_fail_after
136         self.node_stale_after = node_stale_after
137         self.consecutive_idle_count = consecutive_idle_count
138         self.last_polls = {}
139         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
140             poll_actor = locals()[poll_name + '_actor']
141             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
142             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
143             self.last_polls[poll_name] = -self.poll_stale_after
144         self.cloud_nodes = _CloudNodeTracker()
145         self.arvados_nodes = _ArvadosNodeTracker()
146         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
147         self.sizes_booting = {} # Actor IDs to node size
148
149     def on_start(self):
150         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
151         self._logger.debug("Daemon started")
152
153     def _update_poll_time(self, poll_key):
154         self.last_polls[poll_key] = time.time()
155
156     def _pair_nodes(self, node_record, arvados_node):
157         self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
158                           node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
159         self._arvados_nodes_actor.subscribe_to(
160             arvados_node['uuid'], node_record.actor.update_arvados_node)
161         node_record.arvados_node = arvados_node
162         self.arvados_nodes.add(node_record)
163
164     def _new_node(self, cloud_node):
165         start_time = self._cloud_driver.node_start_time(cloud_node)
166         shutdown_timer = cnode.ShutdownTimer(start_time,
167                                              self.shutdown_windows)
168         actor = self._node_actor.start(
169             cloud_node=cloud_node,
170             cloud_node_start_time=start_time,
171             shutdown_timer=shutdown_timer,
172             update_actor=self._cloud_updater,
173             timer_actor=self._timer,
174             arvados_node=None,
175             poll_stale_after=self.poll_stale_after,
176             node_stale_after=self.node_stale_after,
177             cloud_client=self._cloud_driver,
178             boot_fail_after=self.boot_fail_after,
179             consecutive_idle_count=self.consecutive_idle_count)
180         actorTell = actor.tell_proxy()
181         actorTell.subscribe(self._later.node_can_shutdown)
182         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
183                                              actorTell.update_cloud_node)
184         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
185         return record
186
187     def _register_cloud_node(self, node):
188         rec = self.cloud_nodes.get(node.id)
189         if rec is None:
190             self._logger.info("Registering new cloud node %s", node.id)
191             record = self._new_node(node)
192             self.cloud_nodes.add(record)
193         else:
194             rec.cloud_node = node
195
196     def update_cloud_nodes(self, nodelist):
197         self._update_poll_time('cloud_nodes')
198         for _, node in self.cloud_nodes.update_from(nodelist):
199             self._register_cloud_node(node)
200
201         self.try_pairing()
202
203         for record in self.cloud_nodes.orphans.itervalues():
204             if record.shutdown_actor:
205                 try:
206                     record.shutdown_actor.stop()
207                 except pykka.ActorDeadError:
208                     pass
209                 record.shutdown_actor = None
210
211             # A recently booted node is a node that successfully completed the
212             # setup actor but has not yet appeared in the cloud node list.
213             # This will have the tag _nodemanager_recently_booted on it, which
214             # means (if we're not shutting it down) we want to put it back into
215             # the cloud node list.  Once it really appears in the cloud list,
216             # the object in record.cloud_node will be replaced by a new one
217             # that lacks the "_nodemanager_recently_booted" tag.
218             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
219                 self.cloud_nodes.add(record)
220             else:
221                 # Node disappeared from the cloud node list. If it's paired,
222                 # remove its idle time counter.
223                 if record.arvados_node:
224                     status.tracker.idle_out(record.arvados_node.get('hostname'))
225                 # Stop the monitor actor if necessary and forget about the node.
226                 if record.actor:
227                     try:
228                         record.actor.stop()
229                     except pykka.ActorDeadError:
230                         pass
231                     record.actor = None
232                 record.cloud_node = None
233
234     def _register_arvados_node(self, key, arv_node):
235         self._logger.info("Registering new Arvados node %s", key)
236         record = _ComputeNodeRecord(arvados_node=arv_node)
237         self.arvados_nodes.add(record)
238
239     def update_arvados_nodes(self, nodelist):
240         self._update_poll_time('arvados_nodes')
241         for key, node in self.arvados_nodes.update_from(nodelist):
242             self._register_arvados_node(key, node)
243         self.try_pairing()
244
245     def try_pairing(self):
246         for record in self.cloud_nodes.unpaired():
247             for arv_rec in self.arvados_nodes.unpaired():
248                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
249                     self._pair_nodes(record, arv_rec.arvados_node)
250                     break
251
252     def _nodes_booting(self, size):
253         s = sum(1
254                 for c in self.booting.iterkeys()
255                 if size is None or self.sizes_booting[c].id == size.id)
256         return s
257
258     def _node_states(self, size):
259         proxy_states = []
260         states = []
261         for rec in self.cloud_nodes.nodes.itervalues():
262             if size is None or rec.cloud_node.size.id == size.id:
263                 if rec.shutdown_actor is None and rec.actor is not None:
264                     proxy_states.append(rec.actor.get_state())
265                 else:
266                     states.append("shutdown")
267         return states + pykka.get_all(proxy_states)
268
269     def _update_tracker(self):
270         updates = {
271             k: 0
272             for k in status.tracker.keys()
273             if k.startswith('nodes_')
274         }
275         for s in self._node_states(size=None):
276             updates.setdefault('nodes_'+s, 0)
277             updates['nodes_'+s] += 1
278         updates['nodes_wish'] = len(self.last_wishlist)
279         updates['node_quota'] = self.node_quota
280         status.tracker.update(updates)
281
282     def _state_counts(self, size):
283         states = self._node_states(size)
284         counts = {
285             "booting": self._nodes_booting(size),
286             "unpaired": 0,
287             "busy": 0,
288             "idle": 0,
289             "fail": 0,
290             "down": 0,
291             "shutdown": 0
292         }
293         for s in states:
294             counts[s] = counts[s] + 1
295         return counts
296
297     def _nodes_up(self, counts):
298         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
299         return up
300
301     def _total_price(self):
302         cost = 0
303         cost += sum(self.sizes_booting[c].price
304                     for c in self.booting.iterkeys())
305         cost += sum(c.cloud_node.size.price
306                     for c in self.cloud_nodes.nodes.itervalues())
307         return cost
308
309     def _size_wishlist(self, size):
310         return sum(1 for c in self.last_wishlist if c.id == size.id)
311
312     def _nodes_wanted(self, size):
313         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
314         under_min = self.min_nodes - total_node_count
315         over_max = total_node_count - self.node_quota
316         total_price = self._total_price()
317
318         counts = self._state_counts(size)
319
320         up_count = self._nodes_up(counts)
321         busy_count = counts["busy"]
322         wishlist_count = self._size_wishlist(size)
323
324         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id,
325                           wishlist_count,
326                           up_count,
327                           counts["booting"],
328                           counts["unpaired"],
329                           counts["idle"],
330                           busy_count,
331                           counts["down"]+counts["fail"],
332                           counts["shutdown"])
333
334         if over_max >= 0:
335             return -over_max
336         elif under_min > 0 and size.id == self.min_cloud_size.id:
337             return under_min
338
339         wanted = wishlist_count - (up_count - busy_count)
340         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
341             can_boot = int((self.max_total_price - total_price) / size.price)
342             if can_boot == 0:
343                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
344                                   size.id, size.price, self.max_total_price, total_price)
345             return can_boot
346         else:
347             return wanted
348
349     def _nodes_excess(self, size):
350         counts = self._state_counts(size)
351         up_count = self._nodes_up(counts)
352         if size.id == self.min_cloud_size.id:
353             up_count -= self.min_nodes
354         return up_count - (counts["busy"] + self._size_wishlist(size))
355
356     def update_server_wishlist(self, wishlist):
357         self._update_poll_time('server_wishlist')
358         requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
359         self.last_wishlist = wishlist[:requestable_nodes]
360         for size in reversed(self.server_calculator.cloud_sizes):
361             try:
362                 nodes_wanted = self._nodes_wanted(size)
363                 if nodes_wanted > 0:
364                     self._later.start_node(size)
365                 elif (nodes_wanted < 0) and self.booting:
366                     self._later.stop_booting_node(size)
367             except Exception:
368                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
369         try:
370             self._update_tracker()
371         except:
372             self._logger.exception("while updating tracker")
373
374     def _check_poll_freshness(orig_func):
375         """Decorator to inhibit a method when poll information is stale.
376
377         This decorator checks the timestamps of all the poll information the
378         daemon has received.  The decorated method is only called if none
379         of the timestamps are considered stale.
380         """
381         @functools.wraps(orig_func)
382         def wrapper(self, *args, **kwargs):
383             now = time.time()
384             if all(now - t < self.poll_stale_after
385                    for t in self.last_polls.itervalues()):
386                 return orig_func(self, *args, **kwargs)
387             else:
388                 return None
389         return wrapper
390
391     @_check_poll_freshness
392     def start_node(self, cloud_size):
393         nodes_wanted = self._nodes_wanted(cloud_size)
394         if nodes_wanted < 1:
395             return None
396
397         if not self.cancel_node_shutdown(cloud_size):
398             arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
399             self._logger.info("Want %i more %s nodes.  Booting a node.",
400                               nodes_wanted, cloud_size.id)
401             new_setup = self._node_setup.start(
402                 timer_actor=self._timer,
403                 arvados_client=self._new_arvados(),
404                 arvados_node=arvados_node,
405                 cloud_client=self._new_cloud(),
406                 cloud_size=self.server_calculator.find_size(cloud_size.id))
407             self.booting[new_setup.actor_urn] = new_setup.proxy()
408             self.sizes_booting[new_setup.actor_urn] = cloud_size
409
410             if arvados_node is not None:
411                 self.arvados_nodes[arvados_node['uuid']].assignment_time = (
412                     time.time())
413             new_setup.tell_proxy().subscribe(self._later.node_setup_finished)
414
415         if nodes_wanted > 1:
416             self._later.start_node(cloud_size)
417
418     def _get_actor_attrs(self, actor, *attr_names):
419         return pykka.get_all([getattr(actor, name) for name in attr_names])
420
421     def node_setup_finished(self, setup_proxy):
422         # Called when a SetupActor has completed.
423         cloud_node, arvados_node, error = self._get_actor_attrs(
424             setup_proxy, 'cloud_node', 'arvados_node', 'error')
425         setup_proxy.stop()
426
427         if cloud_node is None:
428             # If cloud_node is None then the node create wasn't successful.
429             if error == dispatch.QuotaExceeded:
430                 # We've hit a quota limit, so adjust node_quota to stop trying to
431                 # boot new nodes until the node count goes down.
432                 self.node_quota = len(self.cloud_nodes)
433                 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
434         else:
435             # Node creation succeeded.  Update cloud node list.
436             cloud_node._nodemanager_recently_booted = True
437             self._register_cloud_node(cloud_node)
438
439             # Different quota policies may in force depending on the cloud
440             # provider, account limits, and the specific mix of nodes sizes
441             # that are already created.  If we are right at the quota limit,
442             # we want to probe to see if the last quota still applies or if we
443             # are allowed to create more nodes.
444             #
445             # For example, if the quota is actually based on core count, the
446             # quota might be 20 single-core machines or 10 dual-core machines.
447             # If we previously set node_quota to 10 dual core machines, but are
448             # now booting single core machines (actual quota 20), we want to
449             # allow the quota to expand so we don't get stuck at 10 machines
450             # forever.
451             if len(self.cloud_nodes) >= self.node_quota:
452                 self.node_quota = len(self.cloud_nodes)+1
453                 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
454
455         self.node_quota = min(self.node_quota, self.max_nodes)
456         del self.booting[setup_proxy.actor_ref.actor_urn]
457         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
458
459     @_check_poll_freshness
460     def stop_booting_node(self, size):
461         nodes_excess = self._nodes_excess(size)
462         if (nodes_excess < 1) or not self.booting:
463             return None
464         for key, node in self.booting.iteritems():
465             try:
466                 if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get(2):
467                     del self.booting[key]
468                     del self.sizes_booting[key]
469                     if nodes_excess > 1:
470                         self._later.stop_booting_node(size)
471                     return
472             except pykka.Timeout:
473                 pass
474
475     @_check_poll_freshness
476     def cancel_node_shutdown(self, size):
477         # Go through shutdown actors and see if there are any of the appropriate size that can be cancelled
478         for record in self.cloud_nodes.nodes.itervalues():
479             try:
480                 if (record.shutdown_actor is not None and
481                     record.cloud_node.size.id == size.id and
482                     record.shutdown_actor.cancel_shutdown("Node size is in wishlist").get(2)):
483                         return True
484             except (pykka.ActorDeadError, pykka.Timeout) as e:
485                 pass
486         return False
487
488     def _begin_node_shutdown(self, node_actor, cancellable):
489         cloud_node_obj = node_actor.cloud_node.get()
490         cloud_node_id = cloud_node_obj.id
491         record = self.cloud_nodes[cloud_node_id]
492         if record.shutdown_actor is not None:
493             return None
494         shutdown = self._node_shutdown.start(
495             timer_actor=self._timer, cloud_client=self._new_cloud(),
496             arvados_client=self._new_arvados(),
497             node_monitor=node_actor.actor_ref, cancellable=cancellable)
498         record.shutdown_actor = shutdown.proxy()
499         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
500
501     @_check_poll_freshness
502     def node_can_shutdown(self, node_actor):
503         try:
504             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
505                 self._begin_node_shutdown(node_actor, cancellable=True)
506             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
507                 # Node is unpaired, which means it probably exceeded its booting
508                 # grace period without a ping, so shut it down so we can boot a new
509                 # node in its place.
510                 self._begin_node_shutdown(node_actor, cancellable=False)
511             elif node_actor.in_state('down', 'fail').get():
512                 # Node is down and unlikely to come back.
513                 self._begin_node_shutdown(node_actor, cancellable=False)
514         except pykka.ActorDeadError as e:
515             # The monitor actor sends shutdown suggestions every time the
516             # node's state is updated, and these go into the daemon actor's
517             # message queue.  It's possible that the node has already been shut
518             # down (which shuts down the node monitor actor).  In that case,
519             # this message is stale and we'll get ActorDeadError when we try to
520             # access node_actor.  Log the error.
521             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
522
523     def node_finished_shutdown(self, shutdown_actor):
524         try:
525             cloud_node, success = self._get_actor_attrs(
526                 shutdown_actor, 'cloud_node', 'success')
527         except pykka.ActorDeadError:
528             return
529         cloud_node_id = cloud_node.id
530
531         try:
532             shutdown_actor.stop()
533         except pykka.ActorDeadError:
534             pass
535
536         try:
537             record = self.cloud_nodes[cloud_node_id]
538         except KeyError:
539             # Cloud node was already removed from the cloud node list
540             # supposedly while the destroy_node call was finishing its
541             # job.
542             return
543         record.shutdown_actor = None
544
545         if not success:
546             return
547
548         # Shutdown was successful, so stop the monitor actor, otherwise it
549         # will keep offering the node as a candidate for shutdown.
550         record.actor.stop()
551         record.actor = None
552
553         # If the node went from being booted to being shut down without ever
554         # appearing in the cloud node list, it will have the
555         # _nodemanager_recently_booted tag, so get rid of it so that the node
556         # can be forgotten completely.
557         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
558             del record.cloud_node._nodemanager_recently_booted
559
560     def shutdown(self):
561         self._logger.info("Shutting down after signal.")
562         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
563
564         # Shut down pollers
565         self._server_wishlist_actor.stop()
566         self._arvados_nodes_actor.stop()
567         self._cloud_nodes_actor.stop()
568
569         # Clear cloud node list
570         self.update_cloud_nodes([])
571
572         # Stop setup actors unless they are in the middle of setup.
573         setup_stops = {key: node.stop_if_no_cloud_node()
574                        for key, node in self.booting.iteritems()}
575         self.booting = {key: self.booting[key]
576                         for key in setup_stops if not setup_stops[key].get()}
577         self._later.await_shutdown()
578
579     def await_shutdown(self):
580         if self.booting:
581             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
582         else:
583             self.stop()