11349: Add management server with /status.json
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from . import status
13 from .computenode import dispatch
14 from .config import actor_class
15
16 class _ComputeNodeRecord(object):
17     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
18                  assignment_time=float('-inf')):
19         self.actor = actor
20         self.cloud_node = cloud_node
21         self.arvados_node = arvados_node
22         self.assignment_time = assignment_time
23         self.shutdown_actor = None
24
25 class _BaseNodeTracker(object):
26     def __init__(self):
27         self.nodes = {}
28         self.orphans = {}
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def update_record(self, key, item):
48         setattr(self.nodes[key], self.RECORD_ATTR, item)
49
50     def update_from(self, response):
51         unseen = set(self.nodes.iterkeys())
52         for item in response:
53             key = self.item_key(item)
54             if key in unseen:
55                 unseen.remove(key)
56                 self.update_record(key, item)
57             else:
58                 yield key, item
59         self.orphans = {key: self.nodes.pop(key) for key in unseen}
60
61     def unpaired(self):
62         return (record for record in self.nodes.itervalues()
63                 if getattr(record, self.PAIR_ATTR) is None)
64
65
66 class _CloudNodeTracker(_BaseNodeTracker):
67     RECORD_ATTR = 'cloud_node'
68     PAIR_ATTR = 'arvados_node'
69     item_key = staticmethod(lambda cloud_node: cloud_node.id)
70
71
72 class _ArvadosNodeTracker(_BaseNodeTracker):
73     RECORD_ATTR = 'arvados_node'
74     PAIR_ATTR = 'cloud_node'
75     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76
77     def find_stale_node(self, stale_time):
78         for record in self.nodes.itervalues():
79             node = record.arvados_node
80             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81                                           stale_time) and
82                   not cnode.timestamp_fresh(record.assignment_time,
83                                             stale_time)):
84                 return node
85         return None
86
87
88 class NodeManagerDaemonActor(actor_class):
89     """Node Manager daemon.
90
91     This actor subscribes to all information polls about cloud nodes,
92     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
93     for every cloud node, subscribing them to poll updates
94     appropriately.  It creates and destroys cloud nodes based on job queue
95     demand, and stops the corresponding ComputeNode actors when their work
96     is done.
97     """
98     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
99                  cloud_nodes_actor, cloud_update_actor, timer_actor,
100                  arvados_factory, cloud_factory,
101                  shutdown_windows, server_calculator,
102                  min_nodes, max_nodes,
103                  poll_stale_after=600,
104                  boot_fail_after=1800,
105                  node_stale_after=7200,
106                  node_setup_class=dispatch.ComputeNodeSetupActor,
107                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
108                  node_actor_class=dispatch.ComputeNodeMonitorActor,
109                  max_total_price=0):
110         super(NodeManagerDaemonActor, self).__init__()
111         self._node_setup = node_setup_class
112         self._node_shutdown = node_shutdown_class
113         self._node_actor = node_actor_class
114         self._cloud_updater = cloud_update_actor
115         self._timer = timer_actor
116         self._new_arvados = arvados_factory
117         self._new_cloud = cloud_factory
118         self._cloud_driver = self._new_cloud()
119         self._later = self.actor_ref.tell_proxy()
120         self.shutdown_windows = shutdown_windows
121         self.server_calculator = server_calculator
122         self.min_cloud_size = self.server_calculator.cheapest_size()
123         self.min_nodes = min_nodes
124         self.max_nodes = max_nodes
125         self.max_total_price = max_total_price
126         self.poll_stale_after = poll_stale_after
127         self.boot_fail_after = boot_fail_after
128         self.node_stale_after = node_stale_after
129         self.last_polls = {}
130         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
131             poll_actor = locals()[poll_name + '_actor']
132             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
133             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
134             self.last_polls[poll_name] = -self.poll_stale_after
135         self.cloud_nodes = _CloudNodeTracker()
136         self.arvados_nodes = _ArvadosNodeTracker()
137         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
138         self.sizes_booting = {} # Actor IDs to node size
139
140     def on_start(self):
141         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
142         self._logger.debug("Daemon started")
143
144     def _update_poll_time(self, poll_key):
145         self.last_polls[poll_key] = time.time()
146
147     def _pair_nodes(self, node_record, arvados_node):
148         self._logger.info("Cloud node %s is now paired with Arvados node %s",
149                           node_record.cloud_node.name, arvados_node['uuid'])
150         self._arvados_nodes_actor.subscribe_to(
151             arvados_node['uuid'], node_record.actor.update_arvados_node)
152         node_record.arvados_node = arvados_node
153         self.arvados_nodes.add(node_record)
154
155     def _new_node(self, cloud_node):
156         start_time = self._cloud_driver.node_start_time(cloud_node)
157         shutdown_timer = cnode.ShutdownTimer(start_time,
158                                              self.shutdown_windows)
159         actor = self._node_actor.start(
160             cloud_node=cloud_node,
161             cloud_node_start_time=start_time,
162             shutdown_timer=shutdown_timer,
163             cloud_fqdn_func=self._cloud_driver.node_fqdn,
164             update_actor=self._cloud_updater,
165             timer_actor=self._timer,
166             arvados_node=None,
167             poll_stale_after=self.poll_stale_after,
168             node_stale_after=self.node_stale_after,
169             cloud_client=self._cloud_driver,
170             boot_fail_after=self.boot_fail_after)
171         actorTell = actor.tell_proxy()
172         actorTell.subscribe(self._later.node_can_shutdown)
173         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
174                                              actorTell.update_cloud_node)
175         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
176         return record
177
178     def _register_cloud_node(self, node):
179         rec = self.cloud_nodes.get(node.id)
180         if rec is None:
181             self._logger.info("Registering new cloud node %s", node.id)
182             record = self._new_node(node)
183             self.cloud_nodes.add(record)
184         else:
185             rec.cloud_node = node
186
187     def update_cloud_nodes(self, nodelist):
188         self._update_poll_time('cloud_nodes')
189         for _, node in self.cloud_nodes.update_from(nodelist):
190             self._register_cloud_node(node)
191
192         self.try_pairing()
193
194         for record in self.cloud_nodes.orphans.itervalues():
195             if record.shutdown_actor:
196                 try:
197                     record.shutdown_actor.stop()
198                 except pykka.ActorDeadError:
199                     pass
200                 record.shutdown_actor = None
201
202             # A recently booted node is a node that successfully completed the
203             # setup actor but has not yet appeared in the cloud node list.
204             # This will have the tag _nodemanager_recently_booted on it, which
205             # means (if we're not shutting it down) we want to put it back into
206             # the cloud node list.  Once it really appears in the cloud list,
207             # the object in record.cloud_node will be replaced by a new one
208             # that lacks the "_nodemanager_recently_booted" tag.
209             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
210                 self.cloud_nodes.add(record)
211             else:
212                 # Node disappeared from the cloud node list.  Stop the monitor
213                 # actor if necessary and forget about the node.
214                 if record.actor:
215                     try:
216                         record.actor.stop()
217                     except pykka.ActorDeadError:
218                         pass
219                     record.actor = None
220                 record.cloud_node = None
221
222     def _register_arvados_node(self, key, arv_node):
223         self._logger.info("Registering new Arvados node %s", key)
224         record = _ComputeNodeRecord(arvados_node=arv_node)
225         self.arvados_nodes.add(record)
226
227     def update_arvados_nodes(self, nodelist):
228         self._update_poll_time('arvados_nodes')
229         for key, node in self.arvados_nodes.update_from(nodelist):
230             self._register_arvados_node(key, node)
231         self.try_pairing()
232
233     def try_pairing(self):
234         for record in self.cloud_nodes.unpaired():
235             for arv_rec in self.arvados_nodes.unpaired():
236                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
237                     self._pair_nodes(record, arv_rec.arvados_node)
238                     break
239
240     def _nodes_booting(self, size):
241         s = sum(1
242                 for c in self.booting.iterkeys()
243                 if size is None or self.sizes_booting[c].id == size.id)
244         return s
245
246     def _node_states(self, size):
247         proxy_states = []
248         states = []
249         for rec in self.cloud_nodes.nodes.itervalues():
250             if size is None or rec.cloud_node.size.id == size.id:
251                 if rec.shutdown_actor is None and rec.actor is not None:
252                     proxy_states.append(rec.actor.get_state())
253                 else:
254                     states.append("shutdown")
255         return states + pykka.get_all(proxy_states)
256
257     def _update_tracker(self):
258         updates = {
259             k: 0
260             for k in status.tracker.keys()
261             if k.startswith('nodes_')
262         }
263         for s in self._node_states(size=None):
264             updates.setdefault('nodes_'+s, 0)
265             updates['nodes_'+s] += 1
266         status.tracker.update(updates)
267
268     def _state_counts(self, size):
269         states = self._node_states(size)
270         counts = {
271             "booting": self._nodes_booting(size),
272             "unpaired": 0,
273             "busy": 0,
274             "idle": 0,
275             "down": 0,
276             "shutdown": 0
277         }
278         for s in states:
279             counts[s] = counts[s] + 1
280         return counts
281
282     def _nodes_up(self, counts):
283         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
284         return up
285
286     def _total_price(self):
287         cost = 0
288         cost += sum(self.sizes_booting[c].price
289                     for c in self.booting.iterkeys())
290         cost += sum(c.cloud_node.size.price
291                     for c in self.cloud_nodes.nodes.itervalues())
292         return cost
293
294     def _size_wishlist(self, size):
295         return sum(1 for c in self.last_wishlist if c.id == size.id)
296
297     def _nodes_wanted(self, size):
298         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
299         under_min = self.min_nodes - total_node_count
300         over_max = total_node_count - self.max_nodes
301         total_price = self._total_price()
302
303         counts = self._state_counts(size)
304
305         up_count = self._nodes_up(counts)
306         busy_count = counts["busy"]
307
308         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
309                           self._size_wishlist(size),
310                           up_count,
311                           counts["booting"],
312                           counts["unpaired"],
313                           counts["idle"],
314                           busy_count,
315                           counts["down"],
316                           counts["shutdown"])
317
318         if over_max >= 0:
319             return -over_max
320         elif under_min > 0 and size.id == self.min_cloud_size.id:
321             return under_min
322
323         wanted = self._size_wishlist(size) - (up_count - busy_count)
324         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
325             can_boot = int((self.max_total_price - total_price) / size.price)
326             if can_boot == 0:
327                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
328                                   size.name, size.price, self.max_total_price, total_price)
329             return can_boot
330         else:
331             return wanted
332
333     def _nodes_excess(self, size):
334         counts = self._state_counts(size)
335         up_count = self._nodes_up(counts)
336         if size.id == self.min_cloud_size.id:
337             up_count -= self.min_nodes
338         return up_count - (counts["busy"] + self._size_wishlist(size))
339
340     def update_server_wishlist(self, wishlist):
341         self._update_poll_time('server_wishlist')
342         self.last_wishlist = wishlist
343         for size in reversed(self.server_calculator.cloud_sizes):
344             try:
345                 nodes_wanted = self._nodes_wanted(size)
346                 if nodes_wanted > 0:
347                     self._later.start_node(size)
348                 elif (nodes_wanted < 0) and self.booting:
349                     self._later.stop_booting_node(size)
350             except Exception as e:
351                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
352         try:
353             self._update_tracker()
354         except:
355             self._logger.exception("while updating tracker")
356
357     def _check_poll_freshness(orig_func):
358         """Decorator to inhibit a method when poll information is stale.
359
360         This decorator checks the timestamps of all the poll information the
361         daemon has received.  The decorated method is only called if none
362         of the timestamps are considered stale.
363         """
364         @functools.wraps(orig_func)
365         def wrapper(self, *args, **kwargs):
366             now = time.time()
367             if all(now - t < self.poll_stale_after
368                    for t in self.last_polls.itervalues()):
369                 return orig_func(self, *args, **kwargs)
370             else:
371                 return None
372         return wrapper
373
374     @_check_poll_freshness
375     def start_node(self, cloud_size):
376         nodes_wanted = self._nodes_wanted(cloud_size)
377         if nodes_wanted < 1:
378             return None
379         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
380         self._logger.info("Want %i more %s nodes.  Booting a node.",
381                           nodes_wanted, cloud_size.name)
382         new_setup = self._node_setup.start(
383             timer_actor=self._timer,
384             arvados_client=self._new_arvados(),
385             arvados_node=arvados_node,
386             cloud_client=self._new_cloud(),
387             cloud_size=cloud_size).proxy()
388         self.booting[new_setup.actor_ref.actor_urn] = new_setup
389         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
390
391         if arvados_node is not None:
392             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
393                 time.time())
394         new_setup.subscribe(self._later.node_up)
395         if nodes_wanted > 1:
396             self._later.start_node(cloud_size)
397
398     def _get_actor_attrs(self, actor, *attr_names):
399         return pykka.get_all([getattr(actor, name) for name in attr_names])
400
401     def node_up(self, setup_proxy):
402         # Called when a SetupActor has completed.
403         cloud_node, arvados_node = self._get_actor_attrs(
404             setup_proxy, 'cloud_node', 'arvados_node')
405         setup_proxy.stop()
406
407         # If cloud_node is None then the node create wasn't
408         # successful and so there isn't anything to do.
409         if cloud_node is not None:
410             # Node creation succeeded.  Update cloud node list.
411             cloud_node._nodemanager_recently_booted = True
412             self._register_cloud_node(cloud_node)
413         del self.booting[setup_proxy.actor_ref.actor_urn]
414         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
415
416     @_check_poll_freshness
417     def stop_booting_node(self, size):
418         nodes_excess = self._nodes_excess(size)
419         if (nodes_excess < 1) or not self.booting:
420             return None
421         for key, node in self.booting.iteritems():
422             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
423                 del self.booting[key]
424                 del self.sizes_booting[key]
425
426                 if nodes_excess > 1:
427                     self._later.stop_booting_node(size)
428                 break
429
430     def _begin_node_shutdown(self, node_actor, cancellable):
431         cloud_node_obj = node_actor.cloud_node.get()
432         cloud_node_id = cloud_node_obj.id
433         record = self.cloud_nodes[cloud_node_id]
434         if record.shutdown_actor is not None:
435             return None
436         shutdown = self._node_shutdown.start(
437             timer_actor=self._timer, cloud_client=self._new_cloud(),
438             arvados_client=self._new_arvados(),
439             node_monitor=node_actor.actor_ref, cancellable=cancellable)
440         record.shutdown_actor = shutdown.proxy()
441         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
442
443     @_check_poll_freshness
444     def node_can_shutdown(self, node_actor):
445         try:
446             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
447                 self._begin_node_shutdown(node_actor, cancellable=True)
448             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
449                 # Node is unpaired, which means it probably exceeded its booting
450                 # grace period without a ping, so shut it down so we can boot a new
451                 # node in its place.
452                 self._begin_node_shutdown(node_actor, cancellable=False)
453             elif node_actor.in_state('down').get():
454                 # Node is down and unlikely to come back.
455                 self._begin_node_shutdown(node_actor, cancellable=False)
456         except pykka.ActorDeadError as e:
457             # The monitor actor sends shutdown suggestions every time the
458             # node's state is updated, and these go into the daemon actor's
459             # message queue.  It's possible that the node has already been shut
460             # down (which shuts down the node monitor actor).  In that case,
461             # this message is stale and we'll get ActorDeadError when we try to
462             # access node_actor.  Log the error.
463             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
464
465     def node_finished_shutdown(self, shutdown_actor):
466         try:
467             cloud_node, success = self._get_actor_attrs(
468                 shutdown_actor, 'cloud_node', 'success')
469         except pykka.ActorDeadError:
470             return
471         cloud_node_id = cloud_node.id
472         record = self.cloud_nodes[cloud_node_id]
473         shutdown_actor.stop()
474         record.shutdown_actor = None
475
476         if not success:
477             return
478
479         # Shutdown was successful, so stop the monitor actor, otherwise it
480         # will keep offering the node as a candidate for shutdown.
481         record.actor.stop()
482         record.actor = None
483
484         # If the node went from being booted to being shut down without ever
485         # appearing in the cloud node list, it will have the
486         # _nodemanager_recently_booted tag, so get rid of it so that the node
487         # can be forgotten completely.
488         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
489             del record.cloud_node._nodemanager_recently_booted
490
491     def shutdown(self):
492         self._logger.info("Shutting down after signal.")
493         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
494         setup_stops = {key: node.stop_if_no_cloud_node()
495                        for key, node in self.booting.iteritems()}
496         self.booting = {key: self.booting[key]
497                         for key in setup_stops if not setup_stops[key].get()}
498         self._later.await_shutdown()
499
500     def await_shutdown(self):
501         if self.booting:
502             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
503         else:
504             self.stop()