Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import functools
9 import logging
10 import time
11
12 import pykka
13
14 from . import computenode as cnode
15 from . import status
16 from .computenode import dispatch
17 from .config import actor_class
18
19 class _ComputeNodeRecord(object):
20     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
21                  assignment_time=float('-inf')):
22         self.actor = actor
23         self.cloud_node = cloud_node
24         self.arvados_node = arvados_node
25         self.assignment_time = assignment_time
26         self.shutdown_actor = None
27
28 class _BaseNodeTracker(object):
29     def __init__(self):
30         self.nodes = {}
31         self.orphans = {}
32
33     # Proxy the methods listed below to self.nodes.
34     def _proxy_method(name):
35         method = getattr(dict, name)
36         @functools.wraps(method, ('__name__', '__doc__'))
37         def wrapper(self, *args, **kwargs):
38             return method(self.nodes, *args, **kwargs)
39         return wrapper
40
41     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
42         locals()[_method_name] = _proxy_method(_method_name)
43
44     def record_key(self, record):
45         return self.item_key(getattr(record, self.RECORD_ATTR))
46
47     def add(self, record):
48         self.nodes[self.record_key(record)] = record
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in unseen:
58                 unseen.remove(key)
59                 self.update_record(key, item)
60             else:
61                 yield key, item
62         self.orphans = {key: self.nodes.pop(key) for key in unseen}
63
64     def unpaired(self):
65         return (record for record in self.nodes.itervalues()
66                 if getattr(record, self.PAIR_ATTR) is None)
67
68
69 class _CloudNodeTracker(_BaseNodeTracker):
70     RECORD_ATTR = 'cloud_node'
71     PAIR_ATTR = 'arvados_node'
72     item_key = staticmethod(lambda cloud_node: cloud_node.id)
73
74
75 class _ArvadosNodeTracker(_BaseNodeTracker):
76     RECORD_ATTR = 'arvados_node'
77     PAIR_ATTR = 'cloud_node'
78     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
79
80     def find_stale_node(self, stale_time):
81         for record in self.nodes.itervalues():
82             node = record.arvados_node
83             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
84                                           stale_time) and
85                   not cnode.timestamp_fresh(record.assignment_time,
86                                             stale_time)):
87                 return node
88         return None
89
90
91 class NodeManagerDaemonActor(actor_class):
92     """Node Manager daemon.
93
94     This actor subscribes to all information polls about cloud nodes,
95     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
96     for every cloud node, subscribing them to poll updates
97     appropriately.  It creates and destroys cloud nodes based on job queue
98     demand, and stops the corresponding ComputeNode actors when their work
99     is done.
100     """
101     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
102                  cloud_nodes_actor, cloud_update_actor, timer_actor,
103                  arvados_factory, cloud_factory,
104                  shutdown_windows, server_calculator,
105                  min_nodes, max_nodes,
106                  poll_stale_after=600,
107                  boot_fail_after=1800,
108                  node_stale_after=7200,
109                  node_setup_class=dispatch.ComputeNodeSetupActor,
110                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
111                  node_actor_class=dispatch.ComputeNodeMonitorActor,
112                  max_total_price=0):
113         super(NodeManagerDaemonActor, self).__init__()
114         self._node_setup = node_setup_class
115         self._node_shutdown = node_shutdown_class
116         self._node_actor = node_actor_class
117         self._cloud_updater = cloud_update_actor
118         self._timer = timer_actor
119         self._new_arvados = arvados_factory
120         self._new_cloud = cloud_factory
121         self._cloud_driver = self._new_cloud()
122         self._later = self.actor_ref.tell_proxy()
123         self.shutdown_windows = shutdown_windows
124         self.server_calculator = server_calculator
125         self.min_cloud_size = self.server_calculator.cheapest_size()
126         self.min_nodes = min_nodes
127         self.max_nodes = max_nodes
128         self.node_quota = max_nodes
129         self.max_total_price = max_total_price
130         self.poll_stale_after = poll_stale_after
131         self.boot_fail_after = boot_fail_after
132         self.node_stale_after = node_stale_after
133         self.last_polls = {}
134         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
135             poll_actor = locals()[poll_name + '_actor']
136             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
137             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
138             self.last_polls[poll_name] = -self.poll_stale_after
139         self.cloud_nodes = _CloudNodeTracker()
140         self.arvados_nodes = _ArvadosNodeTracker()
141         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
142         self.sizes_booting = {} # Actor IDs to node size
143
144     def on_start(self):
145         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
146         self._logger.debug("Daemon started")
147
148     def _update_poll_time(self, poll_key):
149         self.last_polls[poll_key] = time.time()
150
151     def _pair_nodes(self, node_record, arvados_node):
152         self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
153                           node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
154         self._arvados_nodes_actor.subscribe_to(
155             arvados_node['uuid'], node_record.actor.update_arvados_node)
156         node_record.arvados_node = arvados_node
157         self.arvados_nodes.add(node_record)
158
159     def _new_node(self, cloud_node):
160         start_time = self._cloud_driver.node_start_time(cloud_node)
161         shutdown_timer = cnode.ShutdownTimer(start_time,
162                                              self.shutdown_windows)
163         actor = self._node_actor.start(
164             cloud_node=cloud_node,
165             cloud_node_start_time=start_time,
166             shutdown_timer=shutdown_timer,
167             cloud_fqdn_func=self._cloud_driver.node_fqdn,
168             update_actor=self._cloud_updater,
169             timer_actor=self._timer,
170             arvados_node=None,
171             poll_stale_after=self.poll_stale_after,
172             node_stale_after=self.node_stale_after,
173             cloud_client=self._cloud_driver,
174             boot_fail_after=self.boot_fail_after)
175         actorTell = actor.tell_proxy()
176         actorTell.subscribe(self._later.node_can_shutdown)
177         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
178                                              actorTell.update_cloud_node)
179         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
180         return record
181
182     def _register_cloud_node(self, node):
183         rec = self.cloud_nodes.get(node.id)
184         if rec is None:
185             self._logger.info("Registering new cloud node %s", node.id)
186             record = self._new_node(node)
187             self.cloud_nodes.add(record)
188         else:
189             rec.cloud_node = node
190
191     def update_cloud_nodes(self, nodelist):
192         self._update_poll_time('cloud_nodes')
193         for _, node in self.cloud_nodes.update_from(nodelist):
194             self._register_cloud_node(node)
195
196         self.try_pairing()
197
198         for record in self.cloud_nodes.orphans.itervalues():
199             if record.shutdown_actor:
200                 try:
201                     record.shutdown_actor.stop()
202                 except pykka.ActorDeadError:
203                     pass
204                 record.shutdown_actor = None
205
206             # A recently booted node is a node that successfully completed the
207             # setup actor but has not yet appeared in the cloud node list.
208             # This will have the tag _nodemanager_recently_booted on it, which
209             # means (if we're not shutting it down) we want to put it back into
210             # the cloud node list.  Once it really appears in the cloud list,
211             # the object in record.cloud_node will be replaced by a new one
212             # that lacks the "_nodemanager_recently_booted" tag.
213             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
214                 self.cloud_nodes.add(record)
215             else:
216                 # Node disappeared from the cloud node list.  Stop the monitor
217                 # actor if necessary and forget about the node.
218                 if record.actor:
219                     try:
220                         record.actor.stop()
221                     except pykka.ActorDeadError:
222                         pass
223                     record.actor = None
224                 record.cloud_node = None
225
226     def _register_arvados_node(self, key, arv_node):
227         self._logger.info("Registering new Arvados node %s", key)
228         record = _ComputeNodeRecord(arvados_node=arv_node)
229         self.arvados_nodes.add(record)
230
231     def update_arvados_nodes(self, nodelist):
232         self._update_poll_time('arvados_nodes')
233         for key, node in self.arvados_nodes.update_from(nodelist):
234             self._register_arvados_node(key, node)
235         self.try_pairing()
236
237     def try_pairing(self):
238         for record in self.cloud_nodes.unpaired():
239             for arv_rec in self.arvados_nodes.unpaired():
240                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
241                     self._pair_nodes(record, arv_rec.arvados_node)
242                     break
243
244     def _nodes_booting(self, size):
245         s = sum(1
246                 for c in self.booting.iterkeys()
247                 if size is None or self.sizes_booting[c].id == size.id)
248         return s
249
250     def _node_states(self, size):
251         proxy_states = []
252         states = []
253         for rec in self.cloud_nodes.nodes.itervalues():
254             if size is None or rec.cloud_node.size.id == size.id:
255                 if rec.shutdown_actor is None and rec.actor is not None:
256                     proxy_states.append(rec.actor.get_state())
257                 else:
258                     states.append("shutdown")
259         return states + pykka.get_all(proxy_states)
260
261     def _update_tracker(self):
262         updates = {
263             k: 0
264             for k in status.tracker.keys()
265             if k.startswith('nodes_')
266         }
267         for s in self._node_states(size=None):
268             updates.setdefault('nodes_'+s, 0)
269             updates['nodes_'+s] += 1
270         updates['nodes_wish'] = len(self.last_wishlist)
271         status.tracker.update(updates)
272
273     def _state_counts(self, size):
274         states = self._node_states(size)
275         counts = {
276             "booting": self._nodes_booting(size),
277             "unpaired": 0,
278             "busy": 0,
279             "idle": 0,
280             "down": 0,
281             "shutdown": 0
282         }
283         for s in states:
284             counts[s] = counts[s] + 1
285         return counts
286
287     def _nodes_up(self, counts):
288         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
289         return up
290
291     def _total_price(self):
292         cost = 0
293         cost += sum(self.sizes_booting[c].price
294                     for c in self.booting.iterkeys())
295         cost += sum(c.cloud_node.size.price
296                     for c in self.cloud_nodes.nodes.itervalues())
297         return cost
298
299     def _size_wishlist(self, size):
300         return sum(1 for c in self.last_wishlist if c.id == size.id)
301
302     def _nodes_wanted(self, size):
303         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
304         under_min = self.min_nodes - total_node_count
305         over_max = total_node_count - self.node_quota
306         total_price = self._total_price()
307
308         counts = self._state_counts(size)
309
310         up_count = self._nodes_up(counts)
311         busy_count = counts["busy"]
312         wishlist_count = self._size_wishlist(size)
313
314         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
315                           wishlist_count,
316                           up_count,
317                           counts["booting"],
318                           counts["unpaired"],
319                           counts["idle"],
320                           busy_count,
321                           counts["down"],
322                           counts["shutdown"])
323
324         if over_max >= 0:
325             return -over_max
326         elif under_min > 0 and size.id == self.min_cloud_size.id:
327             return under_min
328
329         wanted = wishlist_count - (up_count - busy_count)
330         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
331             can_boot = int((self.max_total_price - total_price) / size.price)
332             if can_boot == 0:
333                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
334                                   size.name, size.price, self.max_total_price, total_price)
335             return can_boot
336         else:
337             return wanted
338
339     def _nodes_excess(self, size):
340         counts = self._state_counts(size)
341         up_count = self._nodes_up(counts)
342         if size.id == self.min_cloud_size.id:
343             up_count -= self.min_nodes
344         return up_count - (counts["busy"] + self._size_wishlist(size))
345
346     def update_server_wishlist(self, wishlist):
347         self._update_poll_time('server_wishlist')
348         self.last_wishlist = wishlist
349         for size in reversed(self.server_calculator.cloud_sizes):
350             try:
351                 nodes_wanted = self._nodes_wanted(size)
352                 if nodes_wanted > 0:
353                     self._later.start_node(size)
354                 elif (nodes_wanted < 0) and self.booting:
355                     self._later.stop_booting_node(size)
356             except Exception as e:
357                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
358         try:
359             self._update_tracker()
360         except:
361             self._logger.exception("while updating tracker")
362
363     def _check_poll_freshness(orig_func):
364         """Decorator to inhibit a method when poll information is stale.
365
366         This decorator checks the timestamps of all the poll information the
367         daemon has received.  The decorated method is only called if none
368         of the timestamps are considered stale.
369         """
370         @functools.wraps(orig_func)
371         def wrapper(self, *args, **kwargs):
372             now = time.time()
373             if all(now - t < self.poll_stale_after
374                    for t in self.last_polls.itervalues()):
375                 return orig_func(self, *args, **kwargs)
376             else:
377                 return None
378         return wrapper
379
380     @_check_poll_freshness
381     def start_node(self, cloud_size):
382         nodes_wanted = self._nodes_wanted(cloud_size)
383         if nodes_wanted < 1:
384             return None
385         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
386         self._logger.info("Want %i more %s nodes.  Booting a node.",
387                           nodes_wanted, cloud_size.name)
388         new_setup = self._node_setup.start(
389             timer_actor=self._timer,
390             arvados_client=self._new_arvados(),
391             arvados_node=arvados_node,
392             cloud_client=self._new_cloud(),
393             cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
394         self.booting[new_setup.actor_ref.actor_urn] = new_setup
395         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
396
397         if arvados_node is not None:
398             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
399                 time.time())
400         new_setup.subscribe(self._later.node_setup_finished)
401         if nodes_wanted > 1:
402             self._later.start_node(cloud_size)
403
404     def _get_actor_attrs(self, actor, *attr_names):
405         return pykka.get_all([getattr(actor, name) for name in attr_names])
406
407     def node_setup_finished(self, setup_proxy):
408         # Called when a SetupActor has completed.
409         cloud_node, arvados_node, error = self._get_actor_attrs(
410             setup_proxy, 'cloud_node', 'arvados_node', 'error')
411         setup_proxy.stop()
412
413         if cloud_node is None:
414             # If cloud_node is None then the node create wasn't successful.
415             if error == dispatch.QuotaExceeded:
416                 # We've hit a quota limit, so adjust node_quota to stop trying to
417                 # boot new nodes until the node count goes down.
418                 self.node_quota = len(self.cloud_nodes)
419                 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
420         else:
421             # Node creation succeeded.  Update cloud node list.
422             cloud_node._nodemanager_recently_booted = True
423             self._register_cloud_node(cloud_node)
424
425             # Different quota policies may in force depending on the cloud
426             # provider, account limits, and the specific mix of nodes sizes
427             # that are already created.  If we are right at the quota limit,
428             # we want to probe to see if the last quota still applies or if we
429             # are allowed to create more nodes.
430             #
431             # For example, if the quota is actually based on core count, the
432             # quota might be 20 single-core machines or 10 dual-core machines.
433             # If we previously set node_quota to 10 dual core machines, but are
434             # now booting single core machines (actual quota 20), we want to
435             # allow the quota to expand so we don't get stuck at 10 machines
436             # forever.
437             if len(self.cloud_nodes) >= self.node_quota:
438                 self.node_quota = len(self.cloud_nodes)+1
439                 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
440
441         self.node_quota = min(self.node_quota, self.max_nodes)
442         del self.booting[setup_proxy.actor_ref.actor_urn]
443         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
444
445     @_check_poll_freshness
446     def stop_booting_node(self, size):
447         nodes_excess = self._nodes_excess(size)
448         if (nodes_excess < 1) or not self.booting:
449             return None
450         for key, node in self.booting.iteritems():
451             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
452                 del self.booting[key]
453                 del self.sizes_booting[key]
454
455                 if nodes_excess > 1:
456                     self._later.stop_booting_node(size)
457                 break
458
459     def _begin_node_shutdown(self, node_actor, cancellable):
460         cloud_node_obj = node_actor.cloud_node.get()
461         cloud_node_id = cloud_node_obj.id
462         record = self.cloud_nodes[cloud_node_id]
463         if record.shutdown_actor is not None:
464             return None
465         shutdown = self._node_shutdown.start(
466             timer_actor=self._timer, cloud_client=self._new_cloud(),
467             arvados_client=self._new_arvados(),
468             node_monitor=node_actor.actor_ref, cancellable=cancellable)
469         record.shutdown_actor = shutdown.proxy()
470         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
471
472     @_check_poll_freshness
473     def node_can_shutdown(self, node_actor):
474         try:
475             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
476                 self._begin_node_shutdown(node_actor, cancellable=True)
477             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
478                 # Node is unpaired, which means it probably exceeded its booting
479                 # grace period without a ping, so shut it down so we can boot a new
480                 # node in its place.
481                 self._begin_node_shutdown(node_actor, cancellable=False)
482             elif node_actor.in_state('down').get():
483                 # Node is down and unlikely to come back.
484                 self._begin_node_shutdown(node_actor, cancellable=False)
485         except pykka.ActorDeadError as e:
486             # The monitor actor sends shutdown suggestions every time the
487             # node's state is updated, and these go into the daemon actor's
488             # message queue.  It's possible that the node has already been shut
489             # down (which shuts down the node monitor actor).  In that case,
490             # this message is stale and we'll get ActorDeadError when we try to
491             # access node_actor.  Log the error.
492             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
493
494     def node_finished_shutdown(self, shutdown_actor):
495         try:
496             cloud_node, success = self._get_actor_attrs(
497                 shutdown_actor, 'cloud_node', 'success')
498         except pykka.ActorDeadError:
499             return
500         cloud_node_id = cloud_node.id
501         record = self.cloud_nodes[cloud_node_id]
502         shutdown_actor.stop()
503         record.shutdown_actor = None
504
505         if not success:
506             return
507
508         # Shutdown was successful, so stop the monitor actor, otherwise it
509         # will keep offering the node as a candidate for shutdown.
510         record.actor.stop()
511         record.actor = None
512
513         # If the node went from being booted to being shut down without ever
514         # appearing in the cloud node list, it will have the
515         # _nodemanager_recently_booted tag, so get rid of it so that the node
516         # can be forgotten completely.
517         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
518             del record.cloud_node._nodemanager_recently_booted
519
520     def shutdown(self):
521         self._logger.info("Shutting down after signal.")
522         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
523
524         # Shut down pollers
525         self._server_wishlist_actor.stop()
526         self._arvados_nodes_actor.stop()
527         self._cloud_nodes_actor.stop()
528
529         # Clear cloud node list
530         self.update_cloud_nodes([])
531
532         # Stop setup actors unless they are in the middle of setup.
533         setup_stops = {key: node.stop_if_no_cloud_node()
534                        for key, node in self.booting.iteritems()}
535         self.booting = {key: self.booting[key]
536                         for key in setup_stops if not setup_stops[key].get()}
537         self._later.await_shutdown()
538
539     def await_shutdown(self):
540         if self.booting:
541             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
542         else:
543             self.stop()