Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11
12 import pykka
13
14 from . import computenode as cnode
15 from . import status
16 from .computenode import dispatch
17 from .config import actor_class
18
19 class _ComputeNodeRecord(object):
20     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21                  assignment_time=float('-inf')):
22         self.actor = actor
23         self.cloud_node = cloud_node
24         self.arvados_node = arvados_node
25         self.assignment_time = assignment_time
26         self.shutdown_actor = None
27
28 class _BaseNodeTracker(object):
29     def __init__(self):
30         self.nodes = {}
31         self.orphans = {}
32
33     # Proxy the methods listed below to self.nodes.
34     def _proxy_method(name):
35         method = getattr(dict, name)
36         @functools.wraps(method, ('__name__', '__doc__'))
37         def wrapper(self, *args, **kwargs):
38             return method(self.nodes, *args, **kwargs)
39         return wrapper
40
41     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42         locals()[_method_name] = _proxy_method(_method_name)
43
44     def record_key(self, record):
45         return self.item_key(getattr(record, self.RECORD_ATTR))
46
47     def add(self, record):
48         self.nodes[self.record_key(record)] = record
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in unseen:
58                 unseen.remove(key)
59                 self.update_record(key, item)
60             else:
61                 yield key, item
62         self.orphans = {key: self.nodes.pop(key) for key in unseen}
63
64     def unpaired(self):
65         return (record for record in self.nodes.itervalues()
66                 if getattr(record, self.PAIR_ATTR) is None)
67
68
69 class _CloudNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'cloud_node'
71     PAIR_ATTR = 'arvados_node'
72     item_key = staticmethod(lambda cloud_node: cloud_node.id)
73
74
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76     RECORD_ATTR = 'arvados_node'
77     PAIR_ATTR = 'cloud_node'
78     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
79
80     def find_stale_node(self, stale_time):
81         # Try to select a stale node record that have an assigned slot first
82         for record in sorted(self.nodes.itervalues(),
83                              key=lambda r: r.arvados_node['slot_number'],
84                              reverse=True):
85             node = record.arvados_node
86             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
87                                           stale_time) and
88                   not cnode.timestamp_fresh(record.assignment_time,
89                                             stale_time)):
90                 return node
91         return None
92
93
94 class NodeManagerDaemonActor(actor_class):
95     """Node Manager daemon.
96
97     This actor subscribes to all information polls about cloud nodes,
98     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
99     for every cloud node, subscribing them to poll updates
100     appropriately.  It creates and destroys cloud nodes based on job queue
101     demand, and stops the corresponding ComputeNode actors when their work
102     is done.
103     """
104     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
105                  cloud_nodes_actor, cloud_update_actor, timer_actor,
106                  arvados_factory, cloud_factory,
107                  shutdown_windows, server_calculator,
108                  min_nodes, max_nodes,
109                  poll_stale_after=600,
110                  boot_fail_after=1800,
111                  node_stale_after=7200,
112                  node_setup_class=dispatch.ComputeNodeSetupActor,
113                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
114                  node_actor_class=dispatch.ComputeNodeMonitorActor,
115                  max_total_price=0):
116         super(NodeManagerDaemonActor, self).__init__()
117         self._node_setup = node_setup_class
118         self._node_shutdown = node_shutdown_class
119         self._node_actor = node_actor_class
120         self._cloud_updater = cloud_update_actor
121         self._timer = timer_actor
122         self._new_arvados = arvados_factory
123         self._new_cloud = cloud_factory
124         self._cloud_driver = self._new_cloud()
125         self._later = self.actor_ref.tell_proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.node_quota = max_nodes
132         self.max_total_price = max_total_price
133         self.poll_stale_after = poll_stale_after
134         self.boot_fail_after = boot_fail_after
135         self.node_stale_after = node_stale_after
136         self.last_polls = {}
137         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
138             poll_actor = locals()[poll_name + '_actor']
139             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
140             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
141             self.last_polls[poll_name] = -self.poll_stale_after
142         self.cloud_nodes = _CloudNodeTracker()
143         self.arvados_nodes = _ArvadosNodeTracker()
144         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
145         self.sizes_booting = {} # Actor IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _pair_nodes(self, node_record, arvados_node):
155         self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
156                           node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
157         self._arvados_nodes_actor.subscribe_to(
158             arvados_node['uuid'], node_record.actor.update_arvados_node)
159         node_record.arvados_node = arvados_node
160         self.arvados_nodes.add(node_record)
161
162     def _new_node(self, cloud_node):
163         start_time = self._cloud_driver.node_start_time(cloud_node)
164         shutdown_timer = cnode.ShutdownTimer(start_time,
165                                              self.shutdown_windows)
166         actor = self._node_actor.start(
167             cloud_node=cloud_node,
168             cloud_node_start_time=start_time,
169             shutdown_timer=shutdown_timer,
170             cloud_fqdn_func=self._cloud_driver.node_fqdn,
171             update_actor=self._cloud_updater,
172             timer_actor=self._timer,
173             arvados_node=None,
174             poll_stale_after=self.poll_stale_after,
175             node_stale_after=self.node_stale_after,
176             cloud_client=self._cloud_driver,
177             boot_fail_after=self.boot_fail_after)
178         actorTell = actor.tell_proxy()
179         actorTell.subscribe(self._later.node_can_shutdown)
180         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
181                                              actorTell.update_cloud_node)
182         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
183         return record
184
185     def _register_cloud_node(self, node):
186         rec = self.cloud_nodes.get(node.id)
187         if rec is None:
188             self._logger.info("Registering new cloud node %s", node.id)
189             record = self._new_node(node)
190             self.cloud_nodes.add(record)
191         else:
192             rec.cloud_node = node
193
194     def update_cloud_nodes(self, nodelist):
195         self._update_poll_time('cloud_nodes')
196         for _, node in self.cloud_nodes.update_from(nodelist):
197             self._register_cloud_node(node)
198
199         self.try_pairing()
200
201         for record in self.cloud_nodes.orphans.itervalues():
202             if record.shutdown_actor:
203                 try:
204                     record.shutdown_actor.stop()
205                 except pykka.ActorDeadError:
206                     pass
207                 record.shutdown_actor = None
208
209             # A recently booted node is a node that successfully completed the
210             # setup actor but has not yet appeared in the cloud node list.
211             # This will have the tag _nodemanager_recently_booted on it, which
212             # means (if we're not shutting it down) we want to put it back into
213             # the cloud node list.  Once it really appears in the cloud list,
214             # the object in record.cloud_node will be replaced by a new one
215             # that lacks the "_nodemanager_recently_booted" tag.
216             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
217                 self.cloud_nodes.add(record)
218             else:
219                 # Node disappeared from the cloud node list.  Stop the monitor
220                 # actor if necessary and forget about the node.
221                 if record.actor:
222                     try:
223                         record.actor.stop()
224                     except pykka.ActorDeadError:
225                         pass
226                     record.actor = None
227                 record.cloud_node = None
228
229     def _register_arvados_node(self, key, arv_node):
230         self._logger.info("Registering new Arvados node %s", key)
231         record = _ComputeNodeRecord(arvados_node=arv_node)
232         self.arvados_nodes.add(record)
233
234     def update_arvados_nodes(self, nodelist):
235         self._update_poll_time('arvados_nodes')
236         for key, node in self.arvados_nodes.update_from(nodelist):
237             self._register_arvados_node(key, node)
238         self.try_pairing()
239
240     def try_pairing(self):
241         for record in self.cloud_nodes.unpaired():
242             for arv_rec in self.arvados_nodes.unpaired():
243                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
244                     self._pair_nodes(record, arv_rec.arvados_node)
245                     break
246
247     def _nodes_booting(self, size):
248         s = sum(1
249                 for c in self.booting.iterkeys()
250                 if size is None or self.sizes_booting[c].id == size.id)
251         return s
252
253     def _node_states(self, size):
254         proxy_states = []
255         states = []
256         for rec in self.cloud_nodes.nodes.itervalues():
257             if size is None or rec.cloud_node.size.id == size.id:
258                 if rec.shutdown_actor is None and rec.actor is not None:
259                     proxy_states.append(rec.actor.get_state())
260                 else:
261                     states.append("shutdown")
262         return states + pykka.get_all(proxy_states)
263
264     def _update_tracker(self):
265         updates = {
266             k: 0
267             for k in status.tracker.keys()
268             if k.startswith('nodes_')
269         }
270         for s in self._node_states(size=None):
271             updates.setdefault('nodes_'+s, 0)
272             updates['nodes_'+s] += 1
273         updates['nodes_wish'] = len(self.last_wishlist)
274         status.tracker.update(updates)
275
276     def _state_counts(self, size):
277         states = self._node_states(size)
278         counts = {
279             "booting": self._nodes_booting(size),
280             "unpaired": 0,
281             "busy": 0,
282             "idle": 0,
283             "fail": 0,
284             "down": 0,
285             "shutdown": 0
286         }
287         for s in states:
288             counts[s] = counts[s] + 1
289         return counts
290
291     def _nodes_up(self, counts):
292         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
293         return up
294
295     def _total_price(self):
296         cost = 0
297         cost += sum(self.sizes_booting[c].price
298                     for c in self.booting.iterkeys())
299         cost += sum(c.cloud_node.size.price
300                     for c in self.cloud_nodes.nodes.itervalues())
301         return cost
302
303     def _size_wishlist(self, size):
304         return sum(1 for c in self.last_wishlist if c.id == size.id)
305
306     def _nodes_wanted(self, size):
307         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
308         under_min = self.min_nodes - total_node_count
309         over_max = total_node_count - self.node_quota
310         total_price = self._total_price()
311
312         counts = self._state_counts(size)
313
314         up_count = self._nodes_up(counts)
315         busy_count = counts["busy"]
316         wishlist_count = self._size_wishlist(size)
317
318         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
319                           wishlist_count,
320                           up_count,
321                           counts["booting"],
322                           counts["unpaired"],
323                           counts["idle"],
324                           busy_count,
325                           counts["down"]+counts["fail"],
326                           counts["shutdown"])
327
328         if over_max >= 0:
329             return -over_max
330         elif under_min > 0 and size.id == self.min_cloud_size.id:
331             return under_min
332
333         wanted = wishlist_count - (up_count - busy_count)
334         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
335             can_boot = int((self.max_total_price - total_price) / size.price)
336             if can_boot == 0:
337                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
338                                   size.name, size.price, self.max_total_price, total_price)
339             return can_boot
340         else:
341             return wanted
342
343     def _nodes_excess(self, size):
344         counts = self._state_counts(size)
345         up_count = self._nodes_up(counts)
346         if size.id == self.min_cloud_size.id:
347             up_count -= self.min_nodes
348         return up_count - (counts["busy"] + self._size_wishlist(size))
349
350     def update_server_wishlist(self, wishlist):
351         self._update_poll_time('server_wishlist')
352         self.last_wishlist = wishlist
353         for size in reversed(self.server_calculator.cloud_sizes):
354             try:
355                 nodes_wanted = self._nodes_wanted(size)
356                 if nodes_wanted > 0:
357                     self._later.start_node(size)
358                 elif (nodes_wanted < 0) and self.booting:
359                     self._later.stop_booting_node(size)
360             except Exception as e:
361                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
362         try:
363             self._update_tracker()
364         except:
365             self._logger.exception("while updating tracker")
366
367     def _check_poll_freshness(orig_func):
368         """Decorator to inhibit a method when poll information is stale.
369
370         This decorator checks the timestamps of all the poll information the
371         daemon has received.  The decorated method is only called if none
372         of the timestamps are considered stale.
373         """
374         @functools.wraps(orig_func)
375         def wrapper(self, *args, **kwargs):
376             now = time.time()
377             if all(now - t < self.poll_stale_after
378                    for t in self.last_polls.itervalues()):
379                 return orig_func(self, *args, **kwargs)
380             else:
381                 return None
382         return wrapper
383
384     @_check_poll_freshness
385     def start_node(self, cloud_size):
386         nodes_wanted = self._nodes_wanted(cloud_size)
387         if nodes_wanted < 1:
388             return None
389         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
390         self._logger.info("Want %i more %s nodes.  Booting a node.",
391                           nodes_wanted, cloud_size.name)
392         new_setup = self._node_setup.start(
393             timer_actor=self._timer,
394             arvados_client=self._new_arvados(),
395             arvados_node=arvados_node,
396             cloud_client=self._new_cloud(),
397             cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
398         self.booting[new_setup.actor_ref.actor_urn] = new_setup
399         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
400
401         if arvados_node is not None:
402             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
403                 time.time())
404         new_setup.subscribe(self._later.node_setup_finished)
405         if nodes_wanted > 1:
406             self._later.start_node(cloud_size)
407
408     def _get_actor_attrs(self, actor, *attr_names):
409         return pykka.get_all([getattr(actor, name) for name in attr_names])
410
411     def node_setup_finished(self, setup_proxy):
412         # Called when a SetupActor has completed.
413         cloud_node, arvados_node, error = self._get_actor_attrs(
414             setup_proxy, 'cloud_node', 'arvados_node', 'error')
415         setup_proxy.stop()
416
417         if cloud_node is None:
418             # If cloud_node is None then the node create wasn't successful.
419             if error == dispatch.QuotaExceeded:
420                 # We've hit a quota limit, so adjust node_quota to stop trying to
421                 # boot new nodes until the node count goes down.
422                 self.node_quota = len(self.cloud_nodes)
423                 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
424         else:
425             # Node creation succeeded.  Update cloud node list.
426             cloud_node._nodemanager_recently_booted = True
427             self._register_cloud_node(cloud_node)
428
429             # Different quota policies may in force depending on the cloud
430             # provider, account limits, and the specific mix of nodes sizes
431             # that are already created.  If we are right at the quota limit,
432             # we want to probe to see if the last quota still applies or if we
433             # are allowed to create more nodes.
434             #
435             # For example, if the quota is actually based on core count, the
436             # quota might be 20 single-core machines or 10 dual-core machines.
437             # If we previously set node_quota to 10 dual core machines, but are
438             # now booting single core machines (actual quota 20), we want to
439             # allow the quota to expand so we don't get stuck at 10 machines
440             # forever.
441             if len(self.cloud_nodes) >= self.node_quota:
442                 self.node_quota = len(self.cloud_nodes)+1
443                 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
444
445         self.node_quota = min(self.node_quota, self.max_nodes)
446         del self.booting[setup_proxy.actor_ref.actor_urn]
447         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
448
449     @_check_poll_freshness
450     def stop_booting_node(self, size):
451         nodes_excess = self._nodes_excess(size)
452         if (nodes_excess < 1) or not self.booting:
453             return None
454         for key, node in self.booting.iteritems():
455             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
456                 del self.booting[key]
457                 del self.sizes_booting[key]
458
459                 if nodes_excess > 1:
460                     self._later.stop_booting_node(size)
461                 break
462
463     def _begin_node_shutdown(self, node_actor, cancellable):
464         cloud_node_obj = node_actor.cloud_node.get()
465         cloud_node_id = cloud_node_obj.id
466         record = self.cloud_nodes[cloud_node_id]
467         if record.shutdown_actor is not None:
468             return None
469         shutdown = self._node_shutdown.start(
470             timer_actor=self._timer, cloud_client=self._new_cloud(),
471             arvados_client=self._new_arvados(),
472             node_monitor=node_actor.actor_ref, cancellable=cancellable)
473         record.shutdown_actor = shutdown.proxy()
474         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
475
476     @_check_poll_freshness
477     def node_can_shutdown(self, node_actor):
478         try:
479             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
480                 self._begin_node_shutdown(node_actor, cancellable=True)
481             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
482                 # Node is unpaired, which means it probably exceeded its booting
483                 # grace period without a ping, so shut it down so we can boot a new
484                 # node in its place.
485                 self._begin_node_shutdown(node_actor, cancellable=False)
486             elif node_actor.in_state('down', 'fail').get():
487                 # Node is down and unlikely to come back.
488                 self._begin_node_shutdown(node_actor, cancellable=False)
489         except pykka.ActorDeadError as e:
490             # The monitor actor sends shutdown suggestions every time the
491             # node's state is updated, and these go into the daemon actor's
492             # message queue.  It's possible that the node has already been shut
493             # down (which shuts down the node monitor actor).  In that case,
494             # this message is stale and we'll get ActorDeadError when we try to
495             # access node_actor.  Log the error.
496             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
497
498     def node_finished_shutdown(self, shutdown_actor):
499         try:
500             cloud_node, success = self._get_actor_attrs(
501                 shutdown_actor, 'cloud_node', 'success')
502         except pykka.ActorDeadError:
503             return
504         cloud_node_id = cloud_node.id
505
506         try:
507             shutdown_actor.stop()
508         except pykka.ActorDeadError:
509             pass
510
511         try:
512             record = self.cloud_nodes[cloud_node_id]
513         except KeyError:
514             # Cloud node was already removed from the cloud node list
515             # supposedly while the destroy_node call was finishing its
516             # job.
517             return
518         record.shutdown_actor = None
519
520         if not success:
521             return
522
523         # Shutdown was successful, so stop the monitor actor, otherwise it
524         # will keep offering the node as a candidate for shutdown.
525         record.actor.stop()
526         record.actor = None
527
528         # If the node went from being booted to being shut down without ever
529         # appearing in the cloud node list, it will have the
530         # _nodemanager_recently_booted tag, so get rid of it so that the node
531         # can be forgotten completely.
532         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
533             del record.cloud_node._nodemanager_recently_booted
534
535     def shutdown(self):
536         self._logger.info("Shutting down after signal.")
537         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
538
539         # Shut down pollers
540         self._server_wishlist_actor.stop()
541         self._arvados_nodes_actor.stop()
542         self._cloud_nodes_actor.stop()
543
544         # Clear cloud node list
545         self.update_cloud_nodes([])
546
547         # Stop setup actors unless they are in the middle of setup.
548         setup_stops = {key: node.stop_if_no_cloud_node()
549                        for key, node in self.booting.iteritems()}
550         self.booting = {key: self.booting[key]
551                         for key in setup_stops if not setup_stops[key].get()}
552         self._later.await_shutdown()
553
554     def await_shutdown(self):
555         if self.booting:
556             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
557         else:
558             self.stop()