8784: Fix test for latest firefox.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from . import status
13 from .computenode import dispatch
14 from .config import actor_class
15
16 class _ComputeNodeRecord(object):
17     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
18                  assignment_time=float('-inf')):
19         self.actor = actor
20         self.cloud_node = cloud_node
21         self.arvados_node = arvados_node
22         self.assignment_time = assignment_time
23         self.shutdown_actor = None
24
25 class _BaseNodeTracker(object):
26     def __init__(self):
27         self.nodes = {}
28         self.orphans = {}
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def update_record(self, key, item):
48         setattr(self.nodes[key], self.RECORD_ATTR, item)
49
50     def update_from(self, response):
51         unseen = set(self.nodes.iterkeys())
52         for item in response:
53             key = self.item_key(item)
54             if key in unseen:
55                 unseen.remove(key)
56                 self.update_record(key, item)
57             else:
58                 yield key, item
59         self.orphans = {key: self.nodes.pop(key) for key in unseen}
60
61     def unpaired(self):
62         return (record for record in self.nodes.itervalues()
63                 if getattr(record, self.PAIR_ATTR) is None)
64
65
66 class _CloudNodeTracker(_BaseNodeTracker):
67     RECORD_ATTR = 'cloud_node'
68     PAIR_ATTR = 'arvados_node'
69     item_key = staticmethod(lambda cloud_node: cloud_node.id)
70
71
72 class _ArvadosNodeTracker(_BaseNodeTracker):
73     RECORD_ATTR = 'arvados_node'
74     PAIR_ATTR = 'cloud_node'
75     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
76
77     def find_stale_node(self, stale_time):
78         for record in self.nodes.itervalues():
79             node = record.arvados_node
80             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
81                                           stale_time) and
82                   not cnode.timestamp_fresh(record.assignment_time,
83                                             stale_time)):
84                 return node
85         return None
86
87
88 class NodeManagerDaemonActor(actor_class):
89     """Node Manager daemon.
90
91     This actor subscribes to all information polls about cloud nodes,
92     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
93     for every cloud node, subscribing them to poll updates
94     appropriately.  It creates and destroys cloud nodes based on job queue
95     demand, and stops the corresponding ComputeNode actors when their work
96     is done.
97     """
98     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
99                  cloud_nodes_actor, cloud_update_actor, timer_actor,
100                  arvados_factory, cloud_factory,
101                  shutdown_windows, server_calculator,
102                  min_nodes, max_nodes,
103                  poll_stale_after=600,
104                  boot_fail_after=1800,
105                  node_stale_after=7200,
106                  node_setup_class=dispatch.ComputeNodeSetupActor,
107                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
108                  node_actor_class=dispatch.ComputeNodeMonitorActor,
109                  max_total_price=0):
110         super(NodeManagerDaemonActor, self).__init__()
111         self._node_setup = node_setup_class
112         self._node_shutdown = node_shutdown_class
113         self._node_actor = node_actor_class
114         self._cloud_updater = cloud_update_actor
115         self._timer = timer_actor
116         self._new_arvados = arvados_factory
117         self._new_cloud = cloud_factory
118         self._cloud_driver = self._new_cloud()
119         self._later = self.actor_ref.tell_proxy()
120         self.shutdown_windows = shutdown_windows
121         self.server_calculator = server_calculator
122         self.min_cloud_size = self.server_calculator.cheapest_size()
123         self.min_nodes = min_nodes
124         self.max_nodes = max_nodes
125         self.node_quota = max_nodes
126         self.max_total_price = max_total_price
127         self.poll_stale_after = poll_stale_after
128         self.boot_fail_after = boot_fail_after
129         self.node_stale_after = node_stale_after
130         self.last_polls = {}
131         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
132             poll_actor = locals()[poll_name + '_actor']
133             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
134             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
135             self.last_polls[poll_name] = -self.poll_stale_after
136         self.cloud_nodes = _CloudNodeTracker()
137         self.arvados_nodes = _ArvadosNodeTracker()
138         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
139         self.sizes_booting = {} # Actor IDs to node size
140
141     def on_start(self):
142         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
143         self._logger.debug("Daemon started")
144
145     def _update_poll_time(self, poll_key):
146         self.last_polls[poll_key] = time.time()
147
148     def _pair_nodes(self, node_record, arvados_node):
149         self._logger.info("Cloud node %s is now paired with Arvados node %s with hostname %s",
150                           node_record.cloud_node.name, arvados_node['uuid'], arvados_node['hostname'])
151         self._arvados_nodes_actor.subscribe_to(
152             arvados_node['uuid'], node_record.actor.update_arvados_node)
153         node_record.arvados_node = arvados_node
154         self.arvados_nodes.add(node_record)
155
156     def _new_node(self, cloud_node):
157         start_time = self._cloud_driver.node_start_time(cloud_node)
158         shutdown_timer = cnode.ShutdownTimer(start_time,
159                                              self.shutdown_windows)
160         actor = self._node_actor.start(
161             cloud_node=cloud_node,
162             cloud_node_start_time=start_time,
163             shutdown_timer=shutdown_timer,
164             cloud_fqdn_func=self._cloud_driver.node_fqdn,
165             update_actor=self._cloud_updater,
166             timer_actor=self._timer,
167             arvados_node=None,
168             poll_stale_after=self.poll_stale_after,
169             node_stale_after=self.node_stale_after,
170             cloud_client=self._cloud_driver,
171             boot_fail_after=self.boot_fail_after)
172         actorTell = actor.tell_proxy()
173         actorTell.subscribe(self._later.node_can_shutdown)
174         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
175                                              actorTell.update_cloud_node)
176         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
177         return record
178
179     def _register_cloud_node(self, node):
180         rec = self.cloud_nodes.get(node.id)
181         if rec is None:
182             self._logger.info("Registering new cloud node %s", node.id)
183             record = self._new_node(node)
184             self.cloud_nodes.add(record)
185         else:
186             rec.cloud_node = node
187
188     def update_cloud_nodes(self, nodelist):
189         self._update_poll_time('cloud_nodes')
190         for _, node in self.cloud_nodes.update_from(nodelist):
191             self._register_cloud_node(node)
192
193         self.try_pairing()
194
195         for record in self.cloud_nodes.orphans.itervalues():
196             if record.shutdown_actor:
197                 try:
198                     record.shutdown_actor.stop()
199                 except pykka.ActorDeadError:
200                     pass
201                 record.shutdown_actor = None
202
203             # A recently booted node is a node that successfully completed the
204             # setup actor but has not yet appeared in the cloud node list.
205             # This will have the tag _nodemanager_recently_booted on it, which
206             # means (if we're not shutting it down) we want to put it back into
207             # the cloud node list.  Once it really appears in the cloud list,
208             # the object in record.cloud_node will be replaced by a new one
209             # that lacks the "_nodemanager_recently_booted" tag.
210             if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
211                 self.cloud_nodes.add(record)
212             else:
213                 # Node disappeared from the cloud node list.  Stop the monitor
214                 # actor if necessary and forget about the node.
215                 if record.actor:
216                     try:
217                         record.actor.stop()
218                     except pykka.ActorDeadError:
219                         pass
220                     record.actor = None
221                 record.cloud_node = None
222
223     def _register_arvados_node(self, key, arv_node):
224         self._logger.info("Registering new Arvados node %s", key)
225         record = _ComputeNodeRecord(arvados_node=arv_node)
226         self.arvados_nodes.add(record)
227
228     def update_arvados_nodes(self, nodelist):
229         self._update_poll_time('arvados_nodes')
230         for key, node in self.arvados_nodes.update_from(nodelist):
231             self._register_arvados_node(key, node)
232         self.try_pairing()
233
234     def try_pairing(self):
235         for record in self.cloud_nodes.unpaired():
236             for arv_rec in self.arvados_nodes.unpaired():
237                 if record.actor is not None and record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
238                     self._pair_nodes(record, arv_rec.arvados_node)
239                     break
240
241     def _nodes_booting(self, size):
242         s = sum(1
243                 for c in self.booting.iterkeys()
244                 if size is None or self.sizes_booting[c].id == size.id)
245         return s
246
247     def _node_states(self, size):
248         proxy_states = []
249         states = []
250         for rec in self.cloud_nodes.nodes.itervalues():
251             if size is None or rec.cloud_node.size.id == size.id:
252                 if rec.shutdown_actor is None and rec.actor is not None:
253                     proxy_states.append(rec.actor.get_state())
254                 else:
255                     states.append("shutdown")
256         return states + pykka.get_all(proxy_states)
257
258     def _update_tracker(self):
259         updates = {
260             k: 0
261             for k in status.tracker.keys()
262             if k.startswith('nodes_')
263         }
264         for s in self._node_states(size=None):
265             updates.setdefault('nodes_'+s, 0)
266             updates['nodes_'+s] += 1
267         updates['nodes_wish'] = len(self.last_wishlist)
268         status.tracker.update(updates)
269
270     def _state_counts(self, size):
271         states = self._node_states(size)
272         counts = {
273             "booting": self._nodes_booting(size),
274             "unpaired": 0,
275             "busy": 0,
276             "idle": 0,
277             "down": 0,
278             "shutdown": 0
279         }
280         for s in states:
281             counts[s] = counts[s] + 1
282         return counts
283
284     def _nodes_up(self, counts):
285         up = counts["booting"] + counts["unpaired"] + counts["idle"] + counts["busy"]
286         return up
287
288     def _total_price(self):
289         cost = 0
290         cost += sum(self.sizes_booting[c].price
291                     for c in self.booting.iterkeys())
292         cost += sum(c.cloud_node.size.price
293                     for c in self.cloud_nodes.nodes.itervalues())
294         return cost
295
296     def _size_wishlist(self, size):
297         return sum(1 for c in self.last_wishlist if c.id == size.id)
298
299     def _nodes_wanted(self, size):
300         total_node_count = self._nodes_booting(None) + len(self.cloud_nodes)
301         under_min = self.min_nodes - total_node_count
302         over_max = total_node_count - self.node_quota
303         total_price = self._total_price()
304
305         counts = self._state_counts(size)
306
307         up_count = self._nodes_up(counts)
308         busy_count = counts["busy"]
309         wishlist_count = self._size_wishlist(size)
310
311         self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
312                           wishlist_count,
313                           up_count,
314                           counts["booting"],
315                           counts["unpaired"],
316                           counts["idle"],
317                           busy_count,
318                           counts["down"],
319                           counts["shutdown"])
320
321         if over_max >= 0:
322             return -over_max
323         elif under_min > 0 and size.id == self.min_cloud_size.id:
324             return under_min
325
326         wanted = wishlist_count - (up_count - busy_count)
327         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
328             can_boot = int((self.max_total_price - total_price) / size.price)
329             if can_boot == 0:
330                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
331                                   size.name, size.price, self.max_total_price, total_price)
332             return can_boot
333         else:
334             return wanted
335
336     def _nodes_excess(self, size):
337         counts = self._state_counts(size)
338         up_count = self._nodes_up(counts)
339         if size.id == self.min_cloud_size.id:
340             up_count -= self.min_nodes
341         return up_count - (counts["busy"] + self._size_wishlist(size))
342
343     def update_server_wishlist(self, wishlist):
344         self._update_poll_time('server_wishlist')
345         self.last_wishlist = wishlist
346         for size in reversed(self.server_calculator.cloud_sizes):
347             try:
348                 nodes_wanted = self._nodes_wanted(size)
349                 if nodes_wanted > 0:
350                     self._later.start_node(size)
351                 elif (nodes_wanted < 0) and self.booting:
352                     self._later.stop_booting_node(size)
353             except Exception as e:
354                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
355         try:
356             self._update_tracker()
357         except:
358             self._logger.exception("while updating tracker")
359
360     def _check_poll_freshness(orig_func):
361         """Decorator to inhibit a method when poll information is stale.
362
363         This decorator checks the timestamps of all the poll information the
364         daemon has received.  The decorated method is only called if none
365         of the timestamps are considered stale.
366         """
367         @functools.wraps(orig_func)
368         def wrapper(self, *args, **kwargs):
369             now = time.time()
370             if all(now - t < self.poll_stale_after
371                    for t in self.last_polls.itervalues()):
372                 return orig_func(self, *args, **kwargs)
373             else:
374                 return None
375         return wrapper
376
377     @_check_poll_freshness
378     def start_node(self, cloud_size):
379         nodes_wanted = self._nodes_wanted(cloud_size)
380         if nodes_wanted < 1:
381             return None
382         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
383         self._logger.info("Want %i more %s nodes.  Booting a node.",
384                           nodes_wanted, cloud_size.name)
385         new_setup = self._node_setup.start(
386             timer_actor=self._timer,
387             arvados_client=self._new_arvados(),
388             arvados_node=arvados_node,
389             cloud_client=self._new_cloud(),
390             cloud_size=self.server_calculator.find_size(cloud_size.id)).proxy()
391         self.booting[new_setup.actor_ref.actor_urn] = new_setup
392         self.sizes_booting[new_setup.actor_ref.actor_urn] = cloud_size
393
394         if arvados_node is not None:
395             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
396                 time.time())
397         new_setup.subscribe(self._later.node_setup_finished)
398         if nodes_wanted > 1:
399             self._later.start_node(cloud_size)
400
401     def _get_actor_attrs(self, actor, *attr_names):
402         return pykka.get_all([getattr(actor, name) for name in attr_names])
403
404     def node_setup_finished(self, setup_proxy):
405         # Called when a SetupActor has completed.
406         cloud_node, arvados_node, error = self._get_actor_attrs(
407             setup_proxy, 'cloud_node', 'arvados_node', 'error')
408         setup_proxy.stop()
409
410         if cloud_node is None:
411             # If cloud_node is None then the node create wasn't successful.
412             if error == dispatch.QuotaExceeded:
413                 # We've hit a quota limit, so adjust node_quota to stop trying to
414                 # boot new nodes until the node count goes down.
415                 self.node_quota = len(self.cloud_nodes)
416                 self._logger.warning("After quota exceeded error setting node quota to %s", self.node_quota)
417         else:
418             # Node creation succeeded.  Update cloud node list.
419             cloud_node._nodemanager_recently_booted = True
420             self._register_cloud_node(cloud_node)
421
422             # Different quota policies may in force depending on the cloud
423             # provider, account limits, and the specific mix of nodes sizes
424             # that are already created.  If we are right at the quota limit,
425             # we want to probe to see if the last quota still applies or if we
426             # are allowed to create more nodes.
427             #
428             # For example, if the quota is actually based on core count, the
429             # quota might be 20 single-core machines or 10 dual-core machines.
430             # If we previously set node_quota to 10 dual core machines, but are
431             # now booting single core machines (actual quota 20), we want to
432             # allow the quota to expand so we don't get stuck at 10 machines
433             # forever.
434             if len(self.cloud_nodes) >= self.node_quota:
435                 self.node_quota = len(self.cloud_nodes)+1
436                 self._logger.warning("After successful boot setting node quota to %s", self.node_quota)
437
438         self.node_quota = min(self.node_quota, self.max_nodes)
439         del self.booting[setup_proxy.actor_ref.actor_urn]
440         del self.sizes_booting[setup_proxy.actor_ref.actor_urn]
441
442     @_check_poll_freshness
443     def stop_booting_node(self, size):
444         nodes_excess = self._nodes_excess(size)
445         if (nodes_excess < 1) or not self.booting:
446             return None
447         for key, node in self.booting.iteritems():
448             if node and node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
449                 del self.booting[key]
450                 del self.sizes_booting[key]
451
452                 if nodes_excess > 1:
453                     self._later.stop_booting_node(size)
454                 break
455
456     def _begin_node_shutdown(self, node_actor, cancellable):
457         cloud_node_obj = node_actor.cloud_node.get()
458         cloud_node_id = cloud_node_obj.id
459         record = self.cloud_nodes[cloud_node_id]
460         if record.shutdown_actor is not None:
461             return None
462         shutdown = self._node_shutdown.start(
463             timer_actor=self._timer, cloud_client=self._new_cloud(),
464             arvados_client=self._new_arvados(),
465             node_monitor=node_actor.actor_ref, cancellable=cancellable)
466         record.shutdown_actor = shutdown.proxy()
467         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
468
469     @_check_poll_freshness
470     def node_can_shutdown(self, node_actor):
471         try:
472             if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
473                 self._begin_node_shutdown(node_actor, cancellable=True)
474             elif self.cloud_nodes.nodes.get(node_actor.cloud_node.get().id).arvados_node is None:
475                 # Node is unpaired, which means it probably exceeded its booting
476                 # grace period without a ping, so shut it down so we can boot a new
477                 # node in its place.
478                 self._begin_node_shutdown(node_actor, cancellable=False)
479             elif node_actor.in_state('down').get():
480                 # Node is down and unlikely to come back.
481                 self._begin_node_shutdown(node_actor, cancellable=False)
482         except pykka.ActorDeadError as e:
483             # The monitor actor sends shutdown suggestions every time the
484             # node's state is updated, and these go into the daemon actor's
485             # message queue.  It's possible that the node has already been shut
486             # down (which shuts down the node monitor actor).  In that case,
487             # this message is stale and we'll get ActorDeadError when we try to
488             # access node_actor.  Log the error.
489             self._logger.debug("ActorDeadError in node_can_shutdown: %s", e)
490
491     def node_finished_shutdown(self, shutdown_actor):
492         try:
493             cloud_node, success = self._get_actor_attrs(
494                 shutdown_actor, 'cloud_node', 'success')
495         except pykka.ActorDeadError:
496             return
497         cloud_node_id = cloud_node.id
498         record = self.cloud_nodes[cloud_node_id]
499         shutdown_actor.stop()
500         record.shutdown_actor = None
501
502         if not success:
503             return
504
505         # Shutdown was successful, so stop the monitor actor, otherwise it
506         # will keep offering the node as a candidate for shutdown.
507         record.actor.stop()
508         record.actor = None
509
510         # If the node went from being booted to being shut down without ever
511         # appearing in the cloud node list, it will have the
512         # _nodemanager_recently_booted tag, so get rid of it so that the node
513         # can be forgotten completely.
514         if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
515             del record.cloud_node._nodemanager_recently_booted
516
517     def shutdown(self):
518         self._logger.info("Shutting down after signal.")
519         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
520
521         # Shut down pollers
522         self._server_wishlist_actor.stop()
523         self._arvados_nodes_actor.stop()
524         self._cloud_nodes_actor.stop()
525
526         # Clear cloud node list
527         self.update_cloud_nodes([])
528
529         # Stop setup actors unless they are in the middle of setup.
530         setup_stops = {key: node.stop_if_no_cloud_node()
531                        for key, node in self.booting.iteritems()}
532         self.booting = {key: self.booting[key]
533                         for key in setup_stops if not setup_stops[key].get()}
534         self._later.await_shutdown()
535
536     def await_shutdown(self):
537         if self.booting:
538             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
539         else:
540             self.stop()