Merge branch '11349-nodemanager-status-api'
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from . import status
13 from .computenode import dispatch
14 from .config import actor_class
15
16 class _ComputeNodeRecord(object):
17     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
18                  assignment_time=float('-inf')):
19         self.actor = actor
20         self.cloud_node = cloud_node
21         self.arvados_node = arvados_node
22         self.assignment_time = assignment_time
23         self.shutdown_actor = None
24
25 class _BaseNodeTracker(object):
26     def __init__(self):
27         self.nodes = {}
28         self.orphans = {}
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def update_record(self, key, item):
48         setattr(self.nodes[key], self.RECORD_ATTR, item)
49
50     def update_from(self, response):
51         unseen = set(self.nodes.iterkeys())
52         for item in response:
53             key = self.item_key(item)
54             if key in unseen:
55                 unseen.remove(key)
56                 self.update_record(key, item)
57             else:
58                 yield key, item
59         self.orphans = {key: self.nodes.pop(key) for key in unseen}
60
61     def unpaired(self):
62         return (record for record in self.nodes.itervalues()
63                 if getattr(record, self.PAIR_ATTR) is None)
64
65
66 class _CloudNodeTracker(_BaseNodeTracker):
67     RECORD_ATTR = 'cloud_node'
68     PAIR_ATTR = 'arvados_node'
69     item_key = staticmethod(lambda cloud_node: cloud_node.id)
70
71
72 class _ArvadosNodeTracker(_BaseNodeTracker):
73     RECORD_ATTR = 'arvados_node'
74     PAIR_ATTR = 'cloud_node'
75     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76
77     def find_stale_node(self, stale_time):
78         for record in self.nodes.itervalues():
79             node = record.arvados_node
80             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81                                           stale_time) and
82                   not cnode.timestamp_fresh(record.assignment_time,
83                                             stale_time)):
84                 return node
85         return None
86
87
88 class NodeManagerDaemonActor(actor_class):
89     """Node Manager daemon.
90
91     This actor subscribes to all information polls about cloud nodes,
92     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
93     for every cloud node, subscribing them to poll updates
94     appropriately.  It creates and destroys cloud nodes based on job queue
95     demand, and stops the corresponding ComputeNode actors when their work
96     is done.
97     """
98     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
99                  cloud_nodes_actor, cloud_update_actor, timer_actor,
100                  arvados_factory, cloud_factory,
101                  shutdown_windows, server_calculator,
102                  min_nodes, max_nodes,
103                  poll_stale_after=600,
104                  boot_fail_after=1800,
105                  node_stale_after=7200,
106                  node_setup_class=dispatch.ComputeNodeSetupActor,
107                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
108                  node_actor_class=dispatch.ComputeNodeMonitorActor,
109                  max_total_price=0):
110         super(NodeManagerDaemonActor, self).__init__()
111         self._node_setup = node_setup_class
112         self._node_shutdown = node_shutdown_class
113         self._node_actor = node_actor_class
114         self._cloud_updater = cloud_update_actor
115         self._timer = timer_actor
116         self._new_arvados = arvados_factory
117         self._new_cloud = cloud_factory
118         self._cloud_driver = self._new_cloud()
119         self._later = self.actor_ref.tell_proxy()
120         self.shutdown_windows = shutdown_windows
121         self.server_calculator = server_calculator
122         self.min_cloud_size = self.server_calculator.cheapest_size()
123         self.min_nodes = min_nodes
124         self.max_nodes = max_nodes
125         self.max_total_price = max_total_price
126         self.poll_stale_after = poll_stale_after
127         self.boot_fail_after = boot_fail_after
128         self.node_stale_after = node_stale_after
129         self.last_polls = {}
130         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
131             poll_actor = locals()[poll_name + '_actor']
132             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
133             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
134             self.last_polls[poll_name] = -self.poll_stale_after
135         self.cloud_nodes = _CloudNodeTracker()
136         self.arvados_nodes = _ArvadosNodeTracker()
137         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
138         self.sizes_booting = {} # Actor IDs to node size
139
140     def on_start(self):
141         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
142         self._logger.debug("Daemon started")
143
144     def _update_poll_time(self, poll_key):
145         self.last_polls[poll_key] = time.time()
146
147     def _pair_nodes(self, node_record, arvados_node):
148         self._logger.info("Cloud node %s is now paired with Arvados node %s",
149                           node_record.cloud_node.name, arvados_node['uuid'])
150         self._arvados_nodes_actor.subscribe_to(
151             arvados_node['uuid'], node_record.actor.update_arvados_node)
152         node_record.arvados_node = arvados_node
153         self.arvados_nodes.add(node_record)
154
155     def _new_node(self, cloud_node):
156         start_time = self._cloud_driver.node_start_time(cloud_node)
157         shutdown_timer = cnode.ShutdownTimer(start_time,
158                                              self.shutdown_windows)
159         actor = self._node_actor.start(
160             cloud_node=cloud_node,
161             cloud_node_start_time=start_time,
162             shutdown_timer=shutdown_timer,
163             cloud_fqdn_func=self._cloud_driver.node_fqdn,
164             update_actor=self._cloud_updater,
165             timer_actor=self._timer,
166             arvados_node=None,
167             poll_stale_after=self.poll_stale_after,
168             node_stale_after=self.node_stale_after,
169             cloud_client=self._cloud_driver,
170             boot_fail_after=self.boot_fail_after)
171         actorTell = actor.tell_proxy()
172         actorTell.subscribe(self._later.node_can_shutdown)
173         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
174                                              actorTell.update_cloud_node)
175         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
176         return record
177
178     def _register_cloud_node(self, node):
179         rec = self.cloud_nodes.get(node.id)
180         if rec is None:
181             self._logger.info("Registering new cloud node %s", node.id)
182             record = self._new_node(node)
183             self.cloud_nodes.add(record)
184         else:
185             rec.cloud_node = node
186
187     def update_cloud_nodes(self, nodelist):
188         self._update_poll_time('cloud_nodes')
189         for _, node in self.cloud_nodes.update_from(nodelist):
190             self._register_cloud_node(node)
191
192         self.try_pairing()
193
194         for record in self.cloud_nodes.orphans.itervalues():
195             if record.shutdown_actor:
196                 try:
197                     record.shutdown_actor.stop()
198                 except pykka.ActorDeadError:
199                     pass
200                 record.shutdown_actor = None
201
202             # A recently booted node is a node that successfully completed the
203             # setup actor but has not yet appeared in the cloud node list.
204             # This will have the tag _nodemanager_recently_booted on it, which
205             # means (if we're not shutting it down) we want to put it back into
206             # the cloud node list.  Once it really appears in the cloud list,
207             # the object in record.cloud_node will be replaced by a new one
208             # that lacks the "_nodemanager_recently_booted" tag.
209             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
210                 self.cloud_nodes.add(record)
211             else:
212                 # Node disappeared from the cloud node list.  Stop the monitor
213                 # actor if necessary and forget about the node.
214                 if record.actor:
215                     try:
216                         record.actor.stop()
217                     except pykka.ActorDeadError:
218                         pass
219                     record.actor = None
220                 record.cloud_node = None
221
222     def _register_arvados_node(self, key, arv_node):
223         self._logger.info("Registering new Arvados node %s", key)
224         record = _ComputeNodeRecord(arvados_node=arv_node)
225         self.arvados_nodes.add(record)
226
227     def update_arvados_nodes(self, nodelist):
228         self._update_poll_time('arvados_nodes')
229         for key, node in self.arvados_nodes.update_from(nodelist):
230             self._register_arvados_node(key, node)
231         self.try_pairing()
232
233     def try_pairing(self):
234         for record in self.cloud_nodes.unpaired():
235             for arv_rec in self.arvados_nodes.unpaired():
236                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
237                     self._pair_nodes(record, arv_rec.arvados_node)
238                     break
239
240     def _nodes_booting(self, size):
241         s = sum(1
242                 for c in self.booting.iterkeys()
243                 if size is None or self.sizes_booting[c].id == size.id)
244         return s
245
246     def _node_states(self, size):
247         proxy_states = []
248         states = []
249         for rec in self.cloud_nodes.nodes.itervalues():
250             if size is None or rec.cloud_node.size.id == size.id:
251                 if rec.shutdown_actor is None and rec.actor is not None:
252                     proxy_states.append(rec.actor.get_state())
253                 else:
254                     states.append("shutdown")
255         return states + pykka.get_all(proxy_states)
256
257     def _update_tracker(self):
258         updates = {
259             k: 0
260             for k in status.tracker.keys()
261             if k.startswith('nodes_')
262         }
263         for s in self._node_states(size=None):
264             updates.setdefault('nodes_'+s, 0)
265             updates['nodes_'+s] += 1
266         updates['nodes_wish'] = len(self.last_wishlist)
267         status.tracker.update(updates)
268
269     def _state_counts(self, size):
270         states = self._node_states(size)
271         counts = {
272             "booting": self._nodes_booting(size),
273             "unpaired": 0,
274             "busy": 0,
275             "idle": 0,
276             "down": 0,
277             "shutdown": 0
278         }
279         for s in states:
280             counts[s] = counts[s] + 1
281         return counts
282
283     def _nodes_up(self, counts):
284         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
285         return up
286
287     def _total_price(self):
288         cost = 0
289         cost += sum(self.sizes_booting[c].price
290                     for c in self.booting.iterkeys())
291         cost += sum(c.cloud_node.size.price
292                     for c in self.cloud_nodes.nodes.itervalues())
293         return cost
294
295     def _size_wishlist(self, size):
296         return sum(1 for c in self.last_wishlist if c.id == size.id)
297
298     def _nodes_wanted(self, size):
299         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
300         under_min = self.min_nodes - total_node_count
301         over_max = total_node_count - self.max_nodes
302         total_price = self._total_price()
303
304         counts = self._state_counts(size)
305
306         up_count = self._nodes_up(counts)
307         busy_count = counts["busy"]
308
309         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
310                           self._size_wishlist(size),
311                           up_count,
312                           counts["booting"],
313                           counts["unpaired"],
314                           counts["idle"],
315                           busy_count,
316                           counts["down"],
317                           counts["shutdown"])
318
319         if over_max >= 0:
320             return -over_max
321         elif under_min > 0 and size.id == self.min_cloud_size.id:
322             return under_min
323
324         wanted = self._size_wishlist(size) - (up_count - busy_count)
325         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
326             can_boot = int((self.max_total_price - total_price) / size.price)
327             if can_boot == 0:
328                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
329                                   size.name, size.price, self.max_total_price, total_price)
330             return can_boot
331         else:
332             return wanted
333
334     def _nodes_excess(self, size):
335         counts = self._state_counts(size)
336         up_count = self._nodes_up(counts)
337         if size.id == self.min_cloud_size.id:
338             up_count -= self.min_nodes
339         return up_count - (counts["busy"] + self._size_wishlist(size))
340
341     def update_server_wishlist(self, wishlist):
342         self._update_poll_time('server_wishlist')
343         self.last_wishlist = wishlist
344         for size in reversed(self.server_calculator.cloud_sizes):
345             try:
346                 nodes_wanted = self._nodes_wanted(size)
347                 if nodes_wanted > 0:
348                     self._later.start_node(size)
349                 elif (nodes_wanted < 0) and self.booting:
350                     self._later.stop_booting_node(size)
351             except Exception as e:
352                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
353         try:
354             self._update_tracker()
355         except:
356             self._logger.exception("while updating tracker")
357
358     def _check_poll_freshness(orig_func):
359         """Decorator to inhibit a method when poll information is stale.
360
361         This decorator checks the timestamps of all the poll information the
362         daemon has received.  The decorated method is only called if none
363         of the timestamps are considered stale.
364         """
365         @functools.wraps(orig_func)
366         def wrapper(self, *args, **kwargs):
367             now = time.time()
368             if all(now - t < self.poll_stale_after
369                    for t in self.last_polls.itervalues()):
370                 return orig_func(self, *args, **kwargs)
371             else:
372                 return None
373         return wrapper
374
375     @_check_poll_freshness
376     def start_node(self, cloud_size):
377         nodes_wanted = self._nodes_wanted(cloud_size)
378         if nodes_wanted < 1:
379             return None
380         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
381         self._logger.info("Want %i more %s nodes.  Booting a node.",
382                           nodes_wanted, cloud_size.name)
383         new_setup = self._node_setup.start(
384             timer_actor=self._timer,
385             arvados_client=self._new_arvados(),
386             arvados_node=arvados_node,
387             cloud_client=self._new_cloud(),
388             cloud_size=cloud_size).proxy()
389         self.booting[new_setup.actor_ref.actor_urn] = new_setup
390         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
391
392         if arvados_node is not None:
393             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
394                 time.time())
395         new_setup.subscribe(self._later.node_up)
396         if nodes_wanted > 1:
397             self._later.start_node(cloud_size)
398
399     def _get_actor_attrs(self, actor, *attr_names):
400         return pykka.get_all([getattr(actor, name) for name in attr_names])
401
402     def node_up(self, setup_proxy):
403         # Called when a SetupActor has completed.
404         cloud_node, arvados_node = self._get_actor_attrs(
405             setup_proxy, 'cloud_node', 'arvados_node')
406         setup_proxy.stop()
407
408         # If cloud_node is None then the node create wasn't
409         # successful and so there isn't anything to do.
410         if cloud_node is not None:
411             # Node creation succeeded.  Update cloud node list.
412             cloud_node._nodemanager_recently_booted = True
413             self._register_cloud_node(cloud_node)
414         del self.booting[setup_proxy.actor_ref.actor_urn]
415         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
416
417     @_check_poll_freshness
418     def stop_booting_node(self, size):
419         nodes_excess = self._nodes_excess(size)
420         if (nodes_excess < 1) or not self.booting:
421             return None
422         for key, node in self.booting.iteritems():
423             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
424                 del self.booting[key]
425                 del self.sizes_booting[key]
426
427                 if nodes_excess > 1:
428                     self._later.stop_booting_node(size)
429                 break
430
431     def _begin_node_shutdown(self, node_actor, cancellable):
432         cloud_node_obj = node_actor.cloud_node.get()
433         cloud_node_id = cloud_node_obj.id
434         record = self.cloud_nodes[cloud_node_id]
435         if record.shutdown_actor is not None:
436             return None
437         shutdown = self._node_shutdown.start(
438             timer_actor=self._timer, cloud_client=self._new_cloud(),
439             arvados_client=self._new_arvados(),
440             node_monitor=node_actor.actor_ref, cancellable=cancellable)
441         record.shutdown_actor = shutdown.proxy()
442         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
443
444     @_check_poll_freshness
445     def node_can_shutdown(self, node_actor):
446         try:
447             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
448                 self._begin_node_shutdown(node_actor, cancellable=True)
449             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
450                 # Node is unpaired, which means it probably exceeded its booting
451                 # grace period without a ping, so shut it down so we can boot a new
452                 # node in its place.
453                 self._begin_node_shutdown(node_actor, cancellable=False)
454             elif node_actor.in_state('down').get():
455                 # Node is down and unlikely to come back.
456                 self._begin_node_shutdown(node_actor, cancellable=False)
457         except pykka.ActorDeadError as e:
458             # The monitor actor sends shutdown suggestions every time the
459             # node's state is updated, and these go into the daemon actor's
460             # message queue.  It's possible that the node has already been shut
461             # down (which shuts down the node monitor actor).  In that case,
462             # this message is stale and we'll get ActorDeadError when we try to
463             # access node_actor.  Log the error.
464             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
465
466     def node_finished_shutdown(self, shutdown_actor):
467         try:
468             cloud_node, success = self._get_actor_attrs(
469                 shutdown_actor, 'cloud_node', 'success')
470         except pykka.ActorDeadError:
471             return
472         cloud_node_id = cloud_node.id
473         record = self.cloud_nodes[cloud_node_id]
474         shutdown_actor.stop()
475         record.shutdown_actor = None
476
477         if not success:
478             return
479
480         # Shutdown was successful, so stop the monitor actor, otherwise it
481         # will keep offering the node as a candidate for shutdown.
482         record.actor.stop()
483         record.actor = None
484
485         # If the node went from being booted to being shut down without ever
486         # appearing in the cloud node list, it will have the
487         # _nodemanager_recently_booted tag, so get rid of it so that the node
488         # can be forgotten completely.
489         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
490             del record.cloud_node._nodemanager_recently_booted
491
492     def shutdown(self):
493         self._logger.info("Shutting down after signal.")
494         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
495         setup_stops = {key: node.stop_if_no_cloud_node()
496                        for key, node in self.booting.iteritems()}
497         self.booting = {key: self.booting[key]
498                         for key in setup_stops if not setup_stops[key].get()}
499         self._later.await_shutdown()
500
501     def await_shutdown(self):
502         if self.booting:
503             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
504         else:
505             self.stop()