9161: Adjusting behavior to accomodate down/broken/missing nodes.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70     def paired(self):
71         return (record for record in self.nodes.itervalues()
72                 if getattr(record, self.PAIR_ATTR) is not None)
73
74
75 class _CloudNodeTracker(_BaseNodeTracker):
76     RECORD_ATTR = 'cloud_node'
77     PAIR_ATTR = 'arvados_node'
78     item_key = staticmethod(lambda cloud_node: cloud_node.id)
79
80
81 class _ArvadosNodeTracker(_BaseNodeTracker):
82     RECORD_ATTR = 'arvados_node'
83     PAIR_ATTR = 'cloud_node'
84     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
85
86     def find_stale_node(self, stale_time):
87         for record in self.nodes.itervalues():
88             node = record.arvados_node
89             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
90                                           stale_time) and
91                   not cnode.timestamp_fresh(record.assignment_time,
92                                             stale_time)):
93                 return node
94         return None
95
96
97 class NodeManagerDaemonActor(actor_class):
98     """Node Manager daemon.
99
100     This actor subscribes to all information polls about cloud nodes,
101     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
102     for every cloud node, subscribing them to poll updates
103     appropriately.  It creates and destroys cloud nodes based on job queue
104     demand, and stops the corresponding ComputeNode actors when their work
105     is done.
106     """
107     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
108                  cloud_nodes_actor, cloud_update_actor, timer_actor,
109                  arvados_factory, cloud_factory,
110                  shutdown_windows, server_calculator,
111                  min_nodes, max_nodes,
112                  poll_stale_after=600,
113                  boot_fail_after=1800,
114                  node_stale_after=7200,
115                  node_setup_class=dispatch.ComputeNodeSetupActor,
116                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
117                  node_actor_class=dispatch.ComputeNodeMonitorActor,
118                  max_total_price=0):
119         super(NodeManagerDaemonActor, self).__init__()
120         self._node_setup = node_setup_class
121         self._node_shutdown = node_shutdown_class
122         self._node_actor = node_actor_class
123         self._cloud_updater = cloud_update_actor
124         self._timer = timer_actor
125         self._new_arvados = arvados_factory
126         self._new_cloud = cloud_factory
127         self._cloud_driver = self._new_cloud()
128         self._later = self.actor_ref.tell_proxy()
129         self.shutdown_windows = shutdown_windows
130         self.server_calculator = server_calculator
131         self.min_cloud_size = self.server_calculator.cheapest_size()
132         self.min_nodes = min_nodes
133         self.max_nodes = max_nodes
134         self.max_total_price = max_total_price
135         self.poll_stale_after = poll_stale_after
136         self.boot_fail_after = boot_fail_after
137         self.node_stale_after = node_stale_after
138         self.last_polls = {}
139         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
140             poll_actor = locals()[poll_name + '_actor']
141             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
142             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
143             self.last_polls[poll_name] = -self.poll_stale_after
144         self.cloud_nodes = _CloudNodeTracker()
145         self.arvados_nodes = _ArvadosNodeTracker()
146         self.booting = {}       # Arvados node UUID to ComputeNodeSetupActors
147         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
148         self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
149
150     def on_start(self):
151         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
152         self._logger.debug("Daemon started")
153
154     def _update_poll_time(self, poll_key):
155         self.last_polls[poll_key] = time.time()
156
157     def _pair_nodes(self, node_record, arvados_node):
158         self._logger.info("Cloud node %s is now paired with Arvados node %s",
159                           node_record.cloud_node.name, arvados_node['uuid'])
160         self._arvados_nodes_actor.subscribe_to(
161             arvados_node['uuid'], node_record.actor.update_arvados_node)
162         node_record.arvados_node = arvados_node
163         self.arvados_nodes.add(node_record)
164
165     def _new_node(self, cloud_node):
166         start_time = self._cloud_driver.node_start_time(cloud_node)
167         shutdown_timer = cnode.ShutdownTimer(start_time,
168                                              self.shutdown_windows)
169         actor = self._node_actor.start(
170             cloud_node=cloud_node,
171             cloud_node_start_time=start_time,
172             shutdown_timer=shutdown_timer,
173             cloud_fqdn_func=self._cloud_driver.node_fqdn,
174             update_actor=self._cloud_updater,
175             timer_actor=self._timer,
176             arvados_node=None,
177             poll_stale_after=self.poll_stale_after,
178             node_stale_after=self.node_stale_after,
179             cloud_client=self._cloud_driver,
180             boot_fail_after=self.boot_fail_after)
181         actorTell = actor.tell_proxy()
182         actorTell.subscribe(self._later.node_can_shutdown)
183         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
184                                              actorTell.update_cloud_node)
185         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
186         return record
187
188     def _register_cloud_node(self, node):
189         rec = self.cloud_nodes.get(node.id)
190         if rec is None:
191             self._logger.info("Registering new cloud node %s", node.id)
192             record = self._new_node(node)
193             self.cloud_nodes.add(record)
194         else:
195             rec.cloud_node = node
196
197     def update_cloud_nodes(self, nodelist):
198         self._update_poll_time('cloud_nodes')
199         for _, node in self.cloud_nodes.update_from(nodelist):
200             self._register_cloud_node(node)
201
202         self.try_pairing()
203
204         for key, record in self.cloud_nodes.orphans.iteritems():
205             if key in self.shutdowns:
206                 try:
207                     self.shutdowns[key].stop().get()
208                 except pykka.ActorDeadError:
209                     pass
210                 del self.shutdowns[key]
211                 del self.sizes_booting_shutdown[key]
212             record.actor.stop()
213             record.cloud_node = None
214
215     def _register_arvados_node(self, key, arv_node):
216         self._logger.info("Registering new Arvados node %s", key)
217         record = _ComputeNodeRecord(arvados_node=arv_node)
218         self.arvados_nodes.add(record)
219
220     def update_arvados_nodes(self, nodelist):
221         self._update_poll_time('arvados_nodes')
222         for key, node in self.arvados_nodes.update_from(nodelist):
223             self._register_arvados_node(key, node)
224         self.try_pairing()
225
226     def try_pairing(self):
227         for record in self.cloud_nodes.unpaired():
228             for arv_rec in self.arvados_nodes.unpaired():
229                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
230                     self._pair_nodes(record, arv_rec.arvados_node)
231                     break
232
233
234     def _nodes_booting(self, size):
235         s = sum(1
236                 for c in self.booting.iterkeys()
237                 if size is None or self.sizes_booting_shutdown[c].id == size.id)
238         return s
239
240     def _nodes_unpaired(self, size):
241         return sum(1
242                    for c in self.cloud_nodes.unpaired()
243                    if size is None or c.cloud_node.size.id == size.id)
244
245     def _nodes_paired(self, size):
246         return sum(1
247                   for c in self.cloud_nodes.paired()
248                   if size is None or c.cloud_node.size.id == size.id)
249
250     def _nodes_down(self, size):
251         # Make sure to iterate over self.cloud_nodes because what we're
252         # counting here are compute nodes that are reported by the cloud
253         # provider but are considered "down" by Arvados.
254         return sum(1 for down in
255                    pykka.get_all(rec.actor.in_state('down') for rec in
256                                  self.cloud_nodes.nodes.itervalues()
257                                  if ((size is None or rec.cloud_node.size.id == size.id) and
258                                      rec.cloud_node.id not in self.shutdowns))
259                    if down)
260
261     def _nodes_up(self, size):
262         up = (self._nodes_booting(size) + self._nodes_unpaired(size) + self._nodes_paired(size)) - (self._nodes_down(size) + self._size_shutdowns(size))
263         return up
264
265     def _total_price(self):
266         cost = 0
267         cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
268                   for c in self.booting.iterkeys())
269         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
270                     for c in self.cloud_nodes.nodes.itervalues())
271         return cost
272
273     def _nodes_busy(self, size):
274         return sum(1 for busy in
275                    pykka.get_all(rec.actor.in_state('busy') for rec in
276                                  self.cloud_nodes.nodes.itervalues()
277                                  if rec.cloud_node.size.id == size.id)
278                    if busy)
279
280     def _size_wishlist(self, size):
281         return sum(1 for c in self.last_wishlist if c.id == size.id)
282
283     def _size_shutdowns(self, size):
284         return sum(1
285                   for c in self.shutdowns.iterkeys()
286                   if size is None or self.sizes_booting_shutdown[c].id == size.id)
287
288     def _nodes_wanted(self, size):
289         total_up_count = self._nodes_up(None) + self._nodes_down(None)
290         under_min = self.min_nodes - total_up_count
291         over_max = total_up_count - self.max_nodes
292         total_price = self._total_price()
293
294         if over_max >= 0:
295             return -over_max
296         elif under_min > 0 and size.id == self.min_cloud_size.id:
297             return under_min
298
299         up_count = self._nodes_up(size)
300         booting_count = self._nodes_booting(size)
301         unpaired_count = self._nodes_unpaired(size)
302         paired_count = self._nodes_paired(size)
303         busy_count = self._nodes_busy(size)
304         down_count = self._nodes_down(size)
305         idle_count = paired_count - (busy_count+down_count)
306         shutdown_count = self._size_shutdowns(size)
307
308         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
309                           self._size_wishlist(size),
310                           up_count,
311                           booting_count,
312                           unpaired_count,
313                           idle_count,
314                           busy_count,
315                           down_count,
316                           shutdown_count)
317
318         wanted = self._size_wishlist(size) - (up_count - busy_count)
319         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
320             can_boot = int((self.max_total_price - total_price) / size.price)
321             if can_boot == 0:
322                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
323                                   size.name, size.price, self.max_total_price, total_price)
324             return can_boot
325         else:
326             return wanted
327
328     def _nodes_excess(self, size):
329         up_count = self._nodes_up(size)
330         if size.id == self.min_cloud_size.id:
331             up_count -= self.min_nodes
332         return up_count - (self._nodes_busy(size) + self._size_wishlist(size))
333
334     def update_server_wishlist(self, wishlist):
335         self._update_poll_time('server_wishlist')
336         self.last_wishlist = wishlist
337         for size in reversed(self.server_calculator.cloud_sizes):
338             try:
339                 nodes_wanted = self._nodes_wanted(size)
340                 if nodes_wanted > 0:
341                     self._later.start_node(size)
342                 elif (nodes_wanted < 0) and self.booting:
343                     self._later.stop_booting_node(size)
344             except Exception as e:
345                 self._logger.exception("while calculating nodes wanted for size %s", size)
346
347     def _check_poll_freshness(orig_func):
348         """Decorator to inhibit a method when poll information is stale.
349
350         This decorator checks the timestamps of all the poll information the
351         daemon has received.  The decorated method is only called if none
352         of the timestamps are considered stale.
353         """
354         @functools.wraps(orig_func)
355         def wrapper(self, *args, **kwargs):
356             now = time.time()
357             if all(now - t < self.poll_stale_after
358                    for t in self.last_polls.itervalues()):
359                 return orig_func(self, *args, **kwargs)
360             else:
361                 return None
362         return wrapper
363
364     @_check_poll_freshness
365     def start_node(self, cloud_size):
366         nodes_wanted = self._nodes_wanted(cloud_size)
367         if nodes_wanted < 1:
368             return None
369         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
370         self._logger.info("Want %i more %s nodes.  Booting a node.",
371                           nodes_wanted, cloud_size.name)
372         new_setup = self._node_setup.start(
373             timer_actor=self._timer,
374             arvados_client=self._new_arvados(),
375             arvados_node=arvados_node,
376             cloud_client=self._new_cloud(),
377             cloud_size=cloud_size).proxy()
378         self.booting[new_setup.actor_ref.actor_urn] = new_setup
379         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
380
381         if arvados_node is not None:
382             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
383                 time.time())
384         new_setup.subscribe(self._later.node_up)
385         if nodes_wanted > 1:
386             self._later.start_node(cloud_size)
387
388     def _get_actor_attrs(self, actor, *attr_names):
389         return pykka.get_all([getattr(actor, name) for name in attr_names])
390
391     def node_up(self, setup_proxy):
392         # Called when a SetupActor has completed.
393         cloud_node, arvados_node = self._get_actor_attrs(
394             setup_proxy, 'cloud_node', 'arvados_node')
395         setup_proxy.stop()
396
397         # If cloud_node is None then the node create wasn't
398         # successful and so there isn't anything to do.
399         if cloud_node is not None:
400             # Node creation succeeded.  Update cloud node list.
401             self._register_cloud_node(cloud_node)
402         del self.booting[setup_proxy.actor_ref.actor_urn]
403         del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
404
405     @_check_poll_freshness
406     def stop_booting_node(self, size):
407         nodes_excess = self._nodes_excess(size)
408         if (nodes_excess < 1) or not self.booting:
409             return None
410         for key, node in self.booting.iteritems():
411             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
412                 del self.booting[key]
413                 del self.sizes_booting_shutdown[key]
414
415                 if nodes_excess > 1:
416                     self._later.stop_booting_node(size)
417                 break
418
419     def _begin_node_shutdown(self, node_actor, cancellable):
420         cloud_node_obj = node_actor.cloud_node.get()
421         cloud_node_id = cloud_node_obj.id
422         if cloud_node_id in self.shutdowns:
423             return None
424         shutdown = self._node_shutdown.start(
425             timer_actor=self._timer, cloud_client=self._new_cloud(),
426             arvados_client=self._new_arvados(),
427             node_monitor=node_actor.actor_ref, cancellable=cancellable)
428         self.shutdowns[cloud_node_id] = shutdown.proxy()
429         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
430         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
431
432     @_check_poll_freshness
433     def node_can_shutdown(self, node_actor):
434         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
435             print("excess")
436             self._begin_node_shutdown(node_actor, cancellable=True)
437         elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
438             # Node is unpaired, which means it probably exceeded its booting
439             # grace period without a ping, so shut it down so we can boot a new
440             # node in its place.
441             print("unpaired")
442             self._begin_node_shutdown(node_actor, cancellable=False)
443         elif node_actor.in_state('down').get():
444             # Node is down and unlikely to come back.
445             self._begin_node_shutdown(node_actor, cancellable=False)
446
447     def node_finished_shutdown(self, shutdown_actor):
448         cloud_node, success, cancel_reason = self._get_actor_attrs(
449             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
450         cloud_node_id = cloud_node.id
451         shutdown_actor.stop()
452         if not success:
453             if cancel_reason == self._node_shutdown.NODE_BROKEN:
454                 self.cloud_nodes.blacklist(cloud_node_id)
455             del self.shutdowns[cloud_node_id]
456             del self.sizes_booting_shutdown[cloud_node_id]
457         # On success, we want to leave the entry in self.shutdowns so that it
458         # won't try to shut down the node again.  It should disappear from the
459         # cloud node list, and the entry in self.shutdowns will get cleaned up
460         # by update_cloud_nodes.
461
462     def shutdown(self):
463         self._logger.info("Shutting down after signal.")
464         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
465         setup_stops = {key: node.stop_if_no_cloud_node()
466                        for key, node in self.booting.iteritems()}
467         self.booting = {key: self.booting[key]
468                         for key in setup_stops if not setup_stops[key].get()}
469         self._later.await_shutdown()
470
471     def await_shutdown(self):
472         if self.booting:
473             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
474         else:
475             self.stop()