Merge branch 'master' into 10979-cancelled-job-nodes
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22         self.shutdown_actor = None
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     # Proxy the methods listed below to self.nodes.
30     def _proxy_method(name):
31         method = getattr(dict, name)
32         @functools.wraps(method, ('__name__', '__doc__'))
33         def wrapper(self, *args, **kwargs):
34             return method(self.nodes, *args, **kwargs)
35         return wrapper
36
37     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38         locals()[_method_name] = _proxy_method(_method_name)
39
40     def record_key(self, record):
41         return self.item_key(getattr(record, self.RECORD_ATTR))
42
43     def add(self, record):
44         self.nodes[self.record_key(record)] = record
45
46     def update_record(self, key, item):
47         setattr(self.nodes[key], self.RECORD_ATTR, item)
48
49     def update_from(self, response):
50         unseen = set(self.nodes.iterkeys())
51         for item in response:
52             key = self.item_key(item)
53             if key in unseen:
54                 unseen.remove(key)
55                 self.update_record(key, item)
56             else:
57                 yield key, item
58         self.orphans = {key: self.nodes.pop(key) for key in unseen}
59
60     def unpaired(self):
61         return (record for record in self.nodes.itervalues()
62                 if getattr(record, self.PAIR_ATTR) is None)
63
64
65 class _CloudNodeTracker(_BaseNodeTracker):
66     RECORD_ATTR = 'cloud_node'
67     PAIR_ATTR = 'arvados_node'
68     item_key = staticmethod(lambda cloud_node: cloud_node.id)
69
70
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'arvados_node'
73     PAIR_ATTR = 'cloud_node'
74     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
75
76     def find_stale_node(self, stale_time):
77         for record in self.nodes.itervalues():
78             node = record.arvados_node
79             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
80                                           stale_time) and
81                   not cnode.timestamp_fresh(record.assignment_time,
82                                             stale_time)):
83                 return node
84         return None
85
86
87 class NodeManagerDaemonActor(actor_class):
88     """Node Manager daemon.
89
90     This actor subscribes to all information polls about cloud nodes,
91     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
92     for every cloud node, subscribing them to poll updates
93     appropriately.  It creates and destroys cloud nodes based on job queue
94     demand, and stops the corresponding ComputeNode actors when their work
95     is done.
96     """
97     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98                  cloud_nodes_actor, cloud_update_actor, timer_actor,
99                  arvados_factory, cloud_factory,
100                  shutdown_windows, server_calculator,
101                  min_nodes, max_nodes,
102                  poll_stale_after=600,
103                  boot_fail_after=1800,
104                  node_stale_after=7200,
105                  node_setup_class=dispatch.ComputeNodeSetupActor,
106                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
107                  node_actor_class=dispatch.ComputeNodeMonitorActor,
108                  max_total_price=0):
109         super(NodeManagerDaemonActor, self).__init__()
110         self._node_setup = node_setup_class
111         self._node_shutdown = node_shutdown_class
112         self._node_actor = node_actor_class
113         self._cloud_updater = cloud_update_actor
114         self._timer = timer_actor
115         self._new_arvados = arvados_factory
116         self._new_cloud = cloud_factory
117         self._cloud_driver = self._new_cloud()
118         self._later = self.actor_ref.tell_proxy()
119         self.shutdown_windows = shutdown_windows
120         self.server_calculator = server_calculator
121         self.min_cloud_size = self.server_calculator.cheapest_size()
122         self.min_nodes = min_nodes
123         self.max_nodes = max_nodes
124         self.max_total_price = max_total_price
125         self.poll_stale_after = poll_stale_after
126         self.boot_fail_after = boot_fail_after
127         self.node_stale_after = node_stale_after
128         self.last_polls = {}
129         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
130             poll_actor = locals()[poll_name + '_actor']
131             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
132             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
133             self.last_polls[poll_name] = -self.poll_stale_after
134         self.cloud_nodes = _CloudNodeTracker()
135         self.arvados_nodes = _ArvadosNodeTracker()
136         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
137         self.sizes_booting = {} # Actor IDs to node size
138
139     def on_start(self):
140         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
141         self._logger.debug("Daemon started")
142
143     def _update_poll_time(self, poll_key):
144         self.last_polls[poll_key] = time.time()
145
146     def _pair_nodes(self, node_record, arvados_node):
147         self._logger.info("Cloud node %s is now paired with Arvados node %s",
148                           node_record.cloud_node.name, arvados_node['uuid'])
149         self._arvados_nodes_actor.subscribe_to(
150             arvados_node['uuid'], node_record.actor.update_arvados_node)
151         node_record.arvados_node = arvados_node
152         self.arvados_nodes.add(node_record)
153
154     def _new_node(self, cloud_node):
155         start_time = self._cloud_driver.node_start_time(cloud_node)
156         shutdown_timer = cnode.ShutdownTimer(start_time,
157                                              self.shutdown_windows)
158         actor = self._node_actor.start(
159             cloud_node=cloud_node,
160             cloud_node_start_time=start_time,
161             shutdown_timer=shutdown_timer,
162             cloud_fqdn_func=self._cloud_driver.node_fqdn,
163             update_actor=self._cloud_updater,
164             timer_actor=self._timer,
165             arvados_node=None,
166             poll_stale_after=self.poll_stale_after,
167             node_stale_after=self.node_stale_after,
168             cloud_client=self._cloud_driver,
169             boot_fail_after=self.boot_fail_after)
170         actorTell = actor.tell_proxy()
171         actorTell.subscribe(self._later.node_can_shutdown)
172         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
173                                              actorTell.update_cloud_node)
174         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
175         return record
176
177     def _register_cloud_node(self, node):
178         rec = self.cloud_nodes.get(node.id)
179         if rec is None:
180             self._logger.info("Registering new cloud node %s", node.id)
181             record = self._new_node(node)
182             self.cloud_nodes.add(record)
183         else:
184             rec.cloud_node = node
185
186     def update_cloud_nodes(self, nodelist):
187         self._update_poll_time('cloud_nodes')
188         for _, node in self.cloud_nodes.update_from(nodelist):
189             self._register_cloud_node(node)
190
191         self.try_pairing()
192
193         for record in self.cloud_nodes.orphans.itervalues():
194             if record.shutdown_actor:
195                 try:
196                     record.shutdown_actor.stop()
197                 except pykka.ActorDeadError:
198                     pass
199                 record.shutdown_actor = None
200
201             # A recently booted node is a node that successfully completed the
202             # setup actor but has not yet appeared in the cloud node list.
203             # This will have the tag _nodemanager_recently_booted on it, which
204             # means (if we're not shutting it down) we want to put it back into
205             # the cloud node list.  Once it really appears in the cloud list,
206             # the object in record.cloud_node will be replaced by a new one
207             # that lacks the "_nodemanager_recently_booted" tag.
208             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
209                 self.cloud_nodes.add(record)
210             else:
211                 # Node disappeared from the cloud node list.  Stop the monitor
212                 # actor if necessary and forget about the node.
213                 if record.actor:
214                     try:
215                         record.actor.stop()
216                     except pykka.ActorDeadError:
217                         pass
218                     record.actor = None
219                 record.cloud_node = None
220
221     def _register_arvados_node(self, key, arv_node):
222         self._logger.info("Registering new Arvados node %s", key)
223         record = _ComputeNodeRecord(arvados_node=arv_node)
224         self.arvados_nodes.add(record)
225
226     def update_arvados_nodes(self, nodelist):
227         self._update_poll_time('arvados_nodes')
228         for key, node in self.arvados_nodes.update_from(nodelist):
229             self._register_arvados_node(key, node)
230         self.try_pairing()
231
232     def try_pairing(self):
233         for record in self.cloud_nodes.unpaired():
234             for arv_rec in self.arvados_nodes.unpaired():
235                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
236                     self._pair_nodes(record, arv_rec.arvados_node)
237                     break
238
239     def _nodes_booting(self, size):
240         s = sum(1
241                 for c in self.booting.iterkeys()
242                 if size is None or self.sizes_booting[c].id == size.id)
243         return s
244
245     def _node_states(self, size):
246         states = pykka.get_all(rec.actor.get_state()
247                                for rec in self.cloud_nodes.nodes.itervalues()
248                                if ((size is None or rec.cloud_node.size.id == size.id) and
249                                    rec.shutdown_actor is None))
250         states += ['shutdown' for rec in self.cloud_nodes.nodes.itervalues()
251                    if ((size is None or rec.cloud_node.size.id == size.id) and
252                        rec.shutdown_actor is not None)]
253         return states
254
255     def _state_counts(self, size):
256         states = self._node_states(size)
257         counts = {
258             "booting": self._nodes_booting(size),
259             "unpaired": 0,
260             "busy": 0,
261             "idle": 0,
262             "down": 0,
263             "shutdown": 0
264         }
265         for s in states:
266             counts[s] = counts[s] + 1
267         return counts
268
269     def _nodes_up(self, counts):
270         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
271         return up
272
273     def _total_price(self):
274         cost = 0
275         cost += sum(self.sizes_booting[c].price
276                     for c in self.booting.iterkeys())
277         cost += sum(c.cloud_node.size.price
278                     for c in self.cloud_nodes.nodes.itervalues())
279         return cost
280
281     def _size_wishlist(self, size):
282         return sum(1 for c in self.last_wishlist if c.id == size.id)
283
284     def _nodes_wanted(self, size):
285         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
286         under_min = self.min_nodes - total_node_count
287         over_max = total_node_count - self.max_nodes
288         total_price = self._total_price()
289
290         counts = self._state_counts(size)
291
292         up_count = self._nodes_up(counts)
293         busy_count = counts["busy"]
294
295         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
296                           self._size_wishlist(size),
297                           up_count,
298                           counts["booting"],
299                           counts["unpaired"],
300                           counts["idle"],
301                           busy_count,
302                           counts["down"],
303                           counts["shutdown"])
304
305         if over_max >= 0:
306             return -over_max
307         elif under_min > 0 and size.id == self.min_cloud_size.id:
308             return under_min
309
310         wanted = self._size_wishlist(size) - (up_count - busy_count)
311         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
312             can_boot = int((self.max_total_price - total_price) / size.price)
313             if can_boot == 0:
314                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
315                                   size.name, size.price, self.max_total_price, total_price)
316             return can_boot
317         else:
318             return wanted
319
320     def _nodes_excess(self, size):
321         counts = self._state_counts(size)
322         up_count = self._nodes_up(counts)
323         if size.id == self.min_cloud_size.id:
324             up_count -= self.min_nodes
325         return up_count - (counts["busy"] + self._size_wishlist(size))
326
327     def update_server_wishlist(self, wishlist):
328         self._update_poll_time('server_wishlist')
329         self.last_wishlist = wishlist
330         for size in reversed(self.server_calculator.cloud_sizes):
331             try:
332                 nodes_wanted = self._nodes_wanted(size)
333                 if nodes_wanted > 0:
334                     self._later.start_node(size)
335                 elif (nodes_wanted < 0) and self.booting:
336                     self._later.stop_booting_node(size)
337             except Exception as e:
338                 self._logger.exception("while calculating nodes wanted for size %s", size)
339
340     def _check_poll_freshness(orig_func):
341         """Decorator to inhibit a method when poll information is stale.
342
343         This decorator checks the timestamps of all the poll information the
344         daemon has received.  The decorated method is only called if none
345         of the timestamps are considered stale.
346         """
347         @functools.wraps(orig_func)
348         def wrapper(self, *args, **kwargs):
349             now = time.time()
350             if all(now - t < self.poll_stale_after
351                    for t in self.last_polls.itervalues()):
352                 return orig_func(self, *args, **kwargs)
353             else:
354                 return None
355         return wrapper
356
357     @_check_poll_freshness
358     def start_node(self, cloud_size):
359         nodes_wanted = self._nodes_wanted(cloud_size)
360         if nodes_wanted < 1:
361             return None
362         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
363         self._logger.info("Want %i more %s nodes.  Booting a node.",
364                           nodes_wanted, cloud_size.name)
365         new_setup = self._node_setup.start(
366             timer_actor=self._timer,
367             arvados_client=self._new_arvados(),
368             arvados_node=arvados_node,
369             cloud_client=self._new_cloud(),
370             cloud_size=cloud_size).proxy()
371         self.booting[new_setup.actor_ref.actor_urn] = new_setup
372         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
373
374         if arvados_node is not None:
375             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
376                 time.time())
377         new_setup.subscribe(self._later.node_up)
378         if nodes_wanted > 1:
379             self._later.start_node(cloud_size)
380
381     def _get_actor_attrs(self, actor, *attr_names):
382         return pykka.get_all([getattr(actor, name) for name in attr_names])
383
384     def node_up(self, setup_proxy):
385         # Called when a SetupActor has completed.
386         cloud_node, arvados_node = self._get_actor_attrs(
387             setup_proxy, 'cloud_node', 'arvados_node')
388         setup_proxy.stop()
389
390         # If cloud_node is None then the node create wasn't
391         # successful and so there isn't anything to do.
392         if cloud_node is not None:
393             # Node creation succeeded.  Update cloud node list.
394             cloud_node._nodemanager_recently_booted = True
395             self._register_cloud_node(cloud_node)
396         del self.booting[setup_proxy.actor_ref.actor_urn]
397         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
398
399     @_check_poll_freshness
400     def stop_booting_node(self, size):
401         nodes_excess = self._nodes_excess(size)
402         if (nodes_excess < 1) or not self.booting:
403             return None
404         for key, node in self.booting.iteritems():
405             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
406                 del self.booting[key]
407                 del self.sizes_booting[key]
408
409                 if nodes_excess > 1:
410                     self._later.stop_booting_node(size)
411                 break
412
413     def _begin_node_shutdown(self, node_actor, cancellable):
414         cloud_node_obj = node_actor.cloud_node.get()
415         cloud_node_id = cloud_node_obj.id
416         record = self.cloud_nodes[cloud_node_id]
417         if record.shutdown_actor is not None:
418             return None
419         shutdown = self._node_shutdown.start(
420             timer_actor=self._timer, cloud_client=self._new_cloud(),
421             arvados_client=self._new_arvados(),
422             node_monitor=node_actor.actor_ref, cancellable=cancellable)
423         record.shutdown_actor = shutdown.proxy()
424         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
425
426     @_check_poll_freshness
427     def node_can_shutdown(self, node_actor):
428         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
429             self._begin_node_shutdown(node_actor, cancellable=True)
430         elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
431             # Node is unpaired, which means it probably exceeded its booting
432             # grace period without a ping, so shut it down so we can boot a new
433             # node in its place.
434             self._begin_node_shutdown(node_actor, cancellable=False)
435         elif node_actor.in_state('down').get():
436             # Node is down and unlikely to come back.
437             self._begin_node_shutdown(node_actor, cancellable=False)
438
439     def node_finished_shutdown(self, shutdown_actor):
440         try:
441             cloud_node, success = self._get_actor_attrs(
442                 shutdown_actor, 'cloud_node', 'success')
443         except pykka.ActorDeadError:
444             return
445         cloud_node_id = cloud_node.id
446         record = self.cloud_nodes[cloud_node_id]
447         shutdown_actor.stop()
448         record.shutdown_actor = None
449
450         if not success:
451             return
452
453         # Shutdown was successful, so stop the monitor actor, otherwise it
454         # will keep offering the node as a candidate for shutdown.
455         record.actor.stop()
456         record.actor = None
457
458         # If the node went from being booted to being shut down without ever
459         # appearing in the cloud node list, it will have the
460         # _nodemanager_recently_booted tag, so get rid of it so that the node
461         # can be forgotten completely.
462         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
463             del record.cloud_node._nodemanager_recently_booted
464
465     def shutdown(self):
466         self._logger.info("Shutting down after signal.")
467         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
468         setup_stops = {key: node.stop_if_no_cloud_node()
469                        for key, node in self.booting.iteritems()}
470         self.booting = {key: self.booting[key]
471                         for key in setup_stops if not setup_stops[key].get()}
472         self._later.await_shutdown()
473
474     def await_shutdown(self):
475         if self.booting:
476             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
477         else:
478             self.stop()