Merge branch '8567-docker-migrator' refs #8567
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22         self.shutdown_actor = None
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28
29     # Proxy the methods listed below to self.nodes.
30     def _proxy_method(name):
31         method = getattr(dict, name)
32         @functools.wraps(method, ('__name__', '__doc__'))
33         def wrapper(self, *args, **kwargs):
34             return method(self.nodes, *args, **kwargs)
35         return wrapper
36
37     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
38         locals()[_method_name] = _proxy_method(_method_name)
39
40     def record_key(self, record):
41         return self.item_key(getattr(record, self.RECORD_ATTR))
42
43     def add(self, record):
44         self.nodes[self.record_key(record)] = record
45
46     def update_record(self, key, item):
47         setattr(self.nodes[key], self.RECORD_ATTR, item)
48
49     def update_from(self, response):
50         unseen = set(self.nodes.iterkeys())
51         for item in response:
52             key = self.item_key(item)
53             if key in unseen:
54                 unseen.remove(key)
55                 self.update_record(key, item)
56             else:
57                 yield key, item
58         self.orphans = {key: self.nodes.pop(key) for key in unseen}
59
60     def unpaired(self):
61         return (record for record in self.nodes.itervalues()
62                 if getattr(record, self.PAIR_ATTR) is None)
63
64
65 class _CloudNodeTracker(_BaseNodeTracker):
66     RECORD_ATTR = 'cloud_node'
67     PAIR_ATTR = 'arvados_node'
68     item_key = staticmethod(lambda cloud_node: cloud_node.id)
69
70
71 class _ArvadosNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'arvados_node'
73     PAIR_ATTR = 'cloud_node'
74     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
75
76     def find_stale_node(self, stale_time):
77         for record in self.nodes.itervalues():
78             node = record.arvados_node
79             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
80                                           stale_time) and
81                   not cnode.timestamp_fresh(record.assignment_time,
82                                             stale_time)):
83                 return node
84         return None
85
86
87 class NodeManagerDaemonActor(actor_class):
88     """Node Manager daemon.
89
90     This actor subscribes to all information polls about cloud nodes,
91     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
92     for every cloud node, subscribing them to poll updates
93     appropriately.  It creates and destroys cloud nodes based on job queue
94     demand, and stops the corresponding ComputeNode actors when their work
95     is done.
96     """
97     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
98                  cloud_nodes_actor, cloud_update_actor, timer_actor,
99                  arvados_factory, cloud_factory,
100                  shutdown_windows, server_calculator,
101                  min_nodes, max_nodes,
102                  poll_stale_after=600,
103                  boot_fail_after=1800,
104                  node_stale_after=7200,
105                  node_setup_class=dispatch.ComputeNodeSetupActor,
106                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
107                  node_actor_class=dispatch.ComputeNodeMonitorActor,
108                  max_total_price=0):
109         super(NodeManagerDaemonActor, self).__init__()
110         self._node_setup = node_setup_class
111         self._node_shutdown = node_shutdown_class
112         self._node_actor = node_actor_class
113         self._cloud_updater = cloud_update_actor
114         self._timer = timer_actor
115         self._new_arvados = arvados_factory
116         self._new_cloud = cloud_factory
117         self._cloud_driver = self._new_cloud()
118         self._later = self.actor_ref.tell_proxy()
119         self.shutdown_windows = shutdown_windows
120         self.server_calculator = server_calculator
121         self.min_cloud_size = self.server_calculator.cheapest_size()
122         self.min_nodes = min_nodes
123         self.max_nodes = max_nodes
124         self.max_total_price = max_total_price
125         self.poll_stale_after = poll_stale_after
126         self.boot_fail_after = boot_fail_after
127         self.node_stale_after = node_stale_after
128         self.last_polls = {}
129         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
130             poll_actor = locals()[poll_name + '_actor']
131             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
132             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
133             self.last_polls[poll_name] = -self.poll_stale_after
134         self.cloud_nodes = _CloudNodeTracker()
135         self.arvados_nodes = _ArvadosNodeTracker()
136         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
137         self.sizes_booting = {} # Actor IDs to node size
138
139     def on_start(self):
140         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
141         self._logger.debug("Daemon started")
142
143     def _update_poll_time(self, poll_key):
144         self.last_polls[poll_key] = time.time()
145
146     def _pair_nodes(self, node_record, arvados_node):
147         self._logger.info("Cloud node %s is now paired with Arvados node %s",
148                           node_record.cloud_node.name, arvados_node['uuid'])
149         self._arvados_nodes_actor.subscribe_to(
150             arvados_node['uuid'], node_record.actor.update_arvados_node)
151         node_record.arvados_node = arvados_node
152         self.arvados_nodes.add(node_record)
153
154     def _new_node(self, cloud_node):
155         start_time = self._cloud_driver.node_start_time(cloud_node)
156         shutdown_timer = cnode.ShutdownTimer(start_time,
157                                              self.shutdown_windows)
158         actor = self._node_actor.start(
159             cloud_node=cloud_node,
160             cloud_node_start_time=start_time,
161             shutdown_timer=shutdown_timer,
162             cloud_fqdn_func=self._cloud_driver.node_fqdn,
163             update_actor=self._cloud_updater,
164             timer_actor=self._timer,
165             arvados_node=None,
166             poll_stale_after=self.poll_stale_after,
167             node_stale_after=self.node_stale_after,
168             cloud_client=self._cloud_driver,
169             boot_fail_after=self.boot_fail_after)
170         actorTell = actor.tell_proxy()
171         actorTell.subscribe(self._later.node_can_shutdown)
172         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
173                                              actorTell.update_cloud_node)
174         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
175         return record
176
177     def _register_cloud_node(self, node):
178         rec = self.cloud_nodes.get(node.id)
179         if rec is None:
180             self._logger.info("Registering new cloud node %s", node.id)
181             record = self._new_node(node)
182             self.cloud_nodes.add(record)
183         else:
184             rec.cloud_node = node
185
186     def update_cloud_nodes(self, nodelist):
187         self._update_poll_time('cloud_nodes')
188         for _, node in self.cloud_nodes.update_from(nodelist):
189             self._register_cloud_node(node)
190
191         self.try_pairing()
192
193         for record in self.cloud_nodes.orphans.itervalues():
194             if record.shutdown_actor:
195                 try:
196                     record.shutdown_actor.stop()
197                 except pykka.ActorDeadError:
198                     pass
199                 record.shutdown_actor = None
200
201             # A recently booted node is a node that successfully completed the
202             # setup actor but has not yet appeared in the cloud node list.
203             # This will have the tag _nodemanager_recently_booted on it, which
204             # means (if we're not shutting it down) we want to put it back into
205             # the cloud node list.  Once it really appears in the cloud list,
206             # the object in record.cloud_node will be replaced by a new one
207             # that lacks the "_nodemanager_recently_booted" tag.
208             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
209                 self.cloud_nodes.add(record)
210             else:
211                 # Node disappeared from the cloud node list.  Stop the monitor
212                 # actor if necessary and forget about the node.
213                 if record.actor:
214                     try:
215                         record.actor.stop()
216                     except pykka.ActorDeadError:
217                         pass
218                     record.actor = None
219                 record.cloud_node = None
220
221     def _register_arvados_node(self, key, arv_node):
222         self._logger.info("Registering new Arvados node %s", key)
223         record = _ComputeNodeRecord(arvados_node=arv_node)
224         self.arvados_nodes.add(record)
225
226     def update_arvados_nodes(self, nodelist):
227         self._update_poll_time('arvados_nodes')
228         for key, node in self.arvados_nodes.update_from(nodelist):
229             self._register_arvados_node(key, node)
230         self.try_pairing()
231
232     def try_pairing(self):
233         for record in self.cloud_nodes.unpaired():
234             for arv_rec in self.arvados_nodes.unpaired():
235                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
236                     self._pair_nodes(record, arv_rec.arvados_node)
237                     break
238
239     def _nodes_booting(self, size):
240         s = sum(1
241                 for c in self.booting.iterkeys()
242                 if size is None or self.sizes_booting[c].id == size.id)
243         return s
244
245     def _node_states(self, size):
246         proxy_states = []
247         states = []
248         for rec in self.cloud_nodes.nodes.itervalues():
249             if size is None or rec.cloud_node.size.id == size.id:
250                 if rec.shutdown_actor is None and rec.actor is not None:
251                     proxy_states.append(rec.actor.get_state())
252                 else:
253                     states.append("shutdown")
254         return states + pykka.get_all(proxy_states)
255
256     def _state_counts(self, size):
257         states = self._node_states(size)
258         counts = {
259             "booting": self._nodes_booting(size),
260             "unpaired": 0,
261             "busy": 0,
262             "idle": 0,
263             "down": 0,
264             "shutdown": 0
265         }
266         for s in states:
267             counts[s] = counts[s] + 1
268         return counts
269
270     def _nodes_up(self, counts):
271         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
272         return up
273
274     def _total_price(self):
275         cost = 0
276         cost += sum(self.sizes_booting[c].price
277                     for c in self.booting.iterkeys())
278         cost += sum(c.cloud_node.size.price
279                     for c in self.cloud_nodes.nodes.itervalues())
280         return cost
281
282     def _size_wishlist(self, size):
283         return sum(1 for c in self.last_wishlist if c.id == size.id)
284
285     def _nodes_wanted(self, size):
286         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
287         under_min = self.min_nodes - total_node_count
288         over_max = total_node_count - self.max_nodes
289         total_price = self._total_price()
290
291         counts = self._state_counts(size)
292
293         up_count = self._nodes_up(counts)
294         busy_count = counts["busy"]
295
296         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
297                           self._size_wishlist(size),
298                           up_count,
299                           counts["booting"],
300                           counts["unpaired"],
301                           counts["idle"],
302                           busy_count,
303                           counts["down"],
304                           counts["shutdown"])
305
306         if over_max >= 0:
307             return -over_max
308         elif under_min > 0 and size.id == self.min_cloud_size.id:
309             return under_min
310
311         wanted = self._size_wishlist(size) - (up_count - busy_count)
312         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
313             can_boot = int((self.max_total_price - total_price) / size.price)
314             if can_boot == 0:
315                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
316                                   size.name, size.price, self.max_total_price, total_price)
317             return can_boot
318         else:
319             return wanted
320
321     def _nodes_excess(self, size):
322         counts = self._state_counts(size)
323         up_count = self._nodes_up(counts)
324         if size.id == self.min_cloud_size.id:
325             up_count -= self.min_nodes
326         return up_count - (counts["busy"] + self._size_wishlist(size))
327
328     def update_server_wishlist(self, wishlist):
329         self._update_poll_time('server_wishlist')
330         self.last_wishlist = wishlist
331         for size in reversed(self.server_calculator.cloud_sizes):
332             try:
333                 nodes_wanted = self._nodes_wanted(size)
334                 if nodes_wanted > 0:
335                     self._later.start_node(size)
336                 elif (nodes_wanted < 0) and self.booting:
337                     self._later.stop_booting_node(size)
338             except Exception as e:
339                 self._logger.exception("while calculating nodes wanted for size %s", size)
340
341     def _check_poll_freshness(orig_func):
342         """Decorator to inhibit a method when poll information is stale.
343
344         This decorator checks the timestamps of all the poll information the
345         daemon has received.  The decorated method is only called if none
346         of the timestamps are considered stale.
347         """
348         @functools.wraps(orig_func)
349         def wrapper(self, *args, **kwargs):
350             now = time.time()
351             if all(now - t < self.poll_stale_after
352                    for t in self.last_polls.itervalues()):
353                 return orig_func(self, *args, **kwargs)
354             else:
355                 return None
356         return wrapper
357
358     @_check_poll_freshness
359     def start_node(self, cloud_size):
360         nodes_wanted = self._nodes_wanted(cloud_size)
361         if nodes_wanted < 1:
362             return None
363         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
364         self._logger.info("Want %i more %s nodes.  Booting a node.",
365                           nodes_wanted, cloud_size.name)
366         new_setup = self._node_setup.start(
367             timer_actor=self._timer,
368             arvados_client=self._new_arvados(),
369             arvados_node=arvados_node,
370             cloud_client=self._new_cloud(),
371             cloud_size=cloud_size).proxy()
372         self.booting[new_setup.actor_ref.actor_urn] = new_setup
373         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
374
375         if arvados_node is not None:
376             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
377                 time.time())
378         new_setup.subscribe(self._later.node_up)
379         if nodes_wanted > 1:
380             self._later.start_node(cloud_size)
381
382     def _get_actor_attrs(self, actor, *attr_names):
383         return pykka.get_all([getattr(actor, name) for name in attr_names])
384
385     def node_up(self, setup_proxy):
386         # Called when a SetupActor has completed.
387         cloud_node, arvados_node = self._get_actor_attrs(
388             setup_proxy, 'cloud_node', 'arvados_node')
389         setup_proxy.stop()
390
391         # If cloud_node is None then the node create wasn't
392         # successful and so there isn't anything to do.
393         if cloud_node is not None:
394             # Node creation succeeded.  Update cloud node list.
395             cloud_node._nodemanager_recently_booted = True
396             self._register_cloud_node(cloud_node)
397         del self.booting[setup_proxy.actor_ref.actor_urn]
398         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
399
400     @_check_poll_freshness
401     def stop_booting_node(self, size):
402         nodes_excess = self._nodes_excess(size)
403         if (nodes_excess < 1) or not self.booting:
404             return None
405         for key, node in self.booting.iteritems():
406             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
407                 del self.booting[key]
408                 del self.sizes_booting[key]
409
410                 if nodes_excess > 1:
411                     self._later.stop_booting_node(size)
412                 break
413
414     def _begin_node_shutdown(self, node_actor, cancellable):
415         cloud_node_obj = node_actor.cloud_node.get()
416         cloud_node_id = cloud_node_obj.id
417         record = self.cloud_nodes[cloud_node_id]
418         if record.shutdown_actor is not None:
419             return None
420         shutdown = self._node_shutdown.start(
421             timer_actor=self._timer, cloud_client=self._new_cloud(),
422             arvados_client=self._new_arvados(),
423             node_monitor=node_actor.actor_ref, cancellable=cancellable)
424         record.shutdown_actor = shutdown.proxy()
425         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
426
427     @_check_poll_freshness
428     def node_can_shutdown(self, node_actor):
429         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
430             self._begin_node_shutdown(node_actor, cancellable=True)
431         elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
432             # Node is unpaired, which means it probably exceeded its booting
433             # grace period without a ping, so shut it down so we can boot a new
434             # node in its place.
435             self._begin_node_shutdown(node_actor, cancellable=False)
436         elif node_actor.in_state('down').get():
437             # Node is down and unlikely to come back.
438             self._begin_node_shutdown(node_actor, cancellable=False)
439
440     def node_finished_shutdown(self, shutdown_actor):
441         try:
442             cloud_node, success = self._get_actor_attrs(
443                 shutdown_actor, 'cloud_node', 'success')
444         except pykka.ActorDeadError:
445             return
446         cloud_node_id = cloud_node.id
447         record = self.cloud_nodes[cloud_node_id]
448         shutdown_actor.stop()
449         record.shutdown_actor = None
450
451         if not success:
452             return
453
454         # Shutdown was successful, so stop the monitor actor, otherwise it
455         # will keep offering the node as a candidate for shutdown.
456         record.actor.stop()
457         record.actor = None
458
459         # If the node went from being booted to being shut down without ever
460         # appearing in the cloud node list, it will have the
461         # _nodemanager_recently_booted tag, so get rid of it so that the node
462         # can be forgotten completely.
463         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
464             del record.cloud_node._nodemanager_recently_booted
465
466     def shutdown(self):
467         self._logger.info("Shutting down after signal.")
468         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
469         setup_stops = {key: node.stop_if_no_cloud_node()
470                        for key, node in self.booting.iteritems()}
471         self.booting = {key: self.booting[key]
472                         for key in setup_stops if not setup_stops[key].get()}
473         self._later.await_shutdown()
474
475     def await_shutdown(self):
476         if self.booting:
477             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
478         else:
479             self.stop()