Merge branch '8697-ruby187-compat'
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._later = self.actor_ref.tell_proxy()
125         self.shutdown_windows = shutdown_windows
126         self.server_calculator = server_calculator
127         self.min_cloud_size = self.server_calculator.cheapest_size()
128         self.min_nodes = min_nodes
129         self.max_nodes = max_nodes
130         self.max_total_price = max_total_price
131         self.poll_stale_after = poll_stale_after
132         self.boot_fail_after = boot_fail_after
133         self.node_stale_after = node_stale_after
134         self.last_polls = {}
135         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136             poll_actor = locals()[poll_name + '_actor']
137             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139             self.last_polls[poll_name] = -self.poll_stale_after
140         self.cloud_nodes = _CloudNodeTracker()
141         self.arvados_nodes = _ArvadosNodeTracker()
142         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
143         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
144         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
145         self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _pair_nodes(self, node_record, arvados_node):
155         self._logger.info("Cloud node %s is now paired with Arvados node %s",
156                           node_record.cloud_node.name, arvados_node['uuid'])
157         self._arvados_nodes_actor.subscribe_to(
158             arvados_node['uuid'], node_record.actor.update_arvados_node)
159         node_record.arvados_node = arvados_node
160         self.arvados_nodes.add(node_record)
161
162     def _new_node(self, cloud_node):
163         start_time = self._cloud_driver.node_start_time(cloud_node)
164         shutdown_timer = cnode.ShutdownTimer(start_time,
165                                              self.shutdown_windows)
166         actor = self._node_actor.start(
167             cloud_node=cloud_node,
168             cloud_node_start_time=start_time,
169             shutdown_timer=shutdown_timer,
170             cloud_fqdn_func=self._cloud_driver.node_fqdn,
171             update_actor=self._cloud_updater,
172             timer_actor=self._timer,
173             arvados_node=None,
174             poll_stale_after=self.poll_stale_after,
175             node_stale_after=self.node_stale_after,
176             cloud_client=self._cloud_driver,
177             boot_fail_after=self.boot_fail_after)
178         actorTell = actor.tell_proxy()
179         actorTell.subscribe(self._later.node_can_shutdown)
180         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
181                                              actorTell.update_cloud_node)
182         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
183         return record
184
185     def update_cloud_nodes(self, nodelist):
186         self._update_poll_time('cloud_nodes')
187         for key, node in self.cloud_nodes.update_from(nodelist):
188             self._logger.info("Registering new cloud node %s", key)
189             if key in self.booted:
190                 record = self.booted.pop(key)
191             else:
192                 record = self._new_node(node)
193             self.cloud_nodes.add(record)
194             for arv_rec in self.arvados_nodes.unpaired():
195                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
196                     self._pair_nodes(record, arv_rec.arvados_node)
197                     break
198         for key, record in self.cloud_nodes.orphans.iteritems():
199             if key in self.shutdowns:
200                 try:
201                     self.shutdowns[key].stop().get()
202                 except pykka.ActorDeadError:
203                     pass
204                 del self.shutdowns[key]
205                 del self.sizes_booting_shutdown[key]
206             record.actor.stop()
207             record.cloud_node = None
208
209     def update_arvados_nodes(self, nodelist):
210         self._update_poll_time('arvados_nodes')
211         for key, node in self.arvados_nodes.update_from(nodelist):
212             self._logger.info("Registering new Arvados node %s", key)
213             record = _ComputeNodeRecord(arvados_node=node)
214             self.arvados_nodes.add(record)
215         for arv_rec in self.arvados_nodes.unpaired():
216             arv_node = arv_rec.arvados_node
217             for cloud_rec in self.cloud_nodes.unpaired():
218                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
219                     self._pair_nodes(cloud_rec, arv_node)
220                     break
221
222     def _nodes_booting(self, size):
223         s = sum(1
224                 for c in self.booting.iterkeys()
225                 if size is None or self.sizes_booting_shutdown[c].id == size.id)
226         s += sum(1
227                  for c in self.booted.itervalues()
228                  if size is None or c.cloud_node.size.id == size.id)
229         return s
230
231     def _nodes_unpaired(self, size):
232         return sum(1
233                    for c in self.cloud_nodes.unpaired()
234                    if size is None or c.cloud_node.size.id == size.id)
235
236     def _nodes_booted(self, size):
237         return sum(1
238                   for c in self.cloud_nodes.nodes.itervalues()
239                   if size is None or c.cloud_node.size.id == size.id)
240
241     def _nodes_down(self, size):
242         # Make sure to iterate over self.cloud_nodes because what we're
243         # counting here are compute nodes that are reported by the cloud
244         # provider but are considered "down" by Arvados.
245         return sum(1 for down in
246                    pykka.get_all(rec.actor.in_state('down') for rec in
247                                  self.cloud_nodes.nodes.itervalues()
248                                  if size is None or rec.cloud_node.size.id == size.id)
249                    if down)
250
251     def _nodes_up(self, size):
252         up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
253         return up
254
255     def _total_price(self):
256         cost = 0
257         cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
258                   for c in self.booting.iterkeys())
259         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
260                     for i in (self.booted, self.cloud_nodes.nodes)
261                     for c in i.itervalues())
262         return cost
263
264     def _nodes_busy(self, size):
265         return sum(1 for busy in
266                    pykka.get_all(rec.actor.in_state('busy') for rec in
267                                  self.cloud_nodes.nodes.itervalues()
268                                  if rec.cloud_node.size.id == size.id)
269                    if busy)
270
271     def _nodes_missing(self, size):
272         return sum(1 for arv_node in
273                    pykka.get_all(rec.actor.arvados_node for rec in
274                                  self.cloud_nodes.nodes.itervalues()
275                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
276                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
277
278     def _size_wishlist(self, size):
279         return sum(1 for c in self.last_wishlist if c.id == size.id)
280
281     def _size_shutdowns(self, size):
282         sh = 0
283         for c in self.shutdowns.iterkeys():
284             try:
285                 if self.sizes_booting_shutdown[c].id == size.id:
286                     sh += 1
287             except pykka.ActorDeadError:
288                 pass
289         return sh
290
291     def _nodes_wanted(self, size):
292         total_up_count = self._nodes_up(None)
293         under_min = self.min_nodes - total_up_count
294         over_max = total_up_count - self.max_nodes
295         total_price = self._total_price()
296
297         if over_max >= 0:
298             return -over_max
299         elif under_min > 0 and size.id == self.min_cloud_size.id:
300             return under_min
301
302         booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
303         shutdown_count = self._size_shutdowns(size)
304         busy_count = self._nodes_busy(size)
305         idle_count = self._nodes_up(size) - (busy_count + self._nodes_missing(size))
306
307         self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
308                           self._size_wishlist(size),
309                           idle_count + busy_count,
310                           booting_count,
311                           idle_count - booting_count,
312                           busy_count,
313                           shutdown_count)
314
315         wanted = self._size_wishlist(size) - idle_count
316         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
317             can_boot = int((self.max_total_price - total_price) / size.price)
318             if can_boot == 0:
319                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
320                                   size.name, size.price, self.max_total_price, total_price)
321             return can_boot
322         else:
323             return wanted
324
325     def _nodes_excess(self, size):
326         up_count = (self._nodes_booting(size) + self._nodes_booted(size)) - self._size_shutdowns(size)
327         if size.id == self.min_cloud_size.id:
328             up_count -= self.min_nodes
329         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
330
331     def update_server_wishlist(self, wishlist):
332         self._update_poll_time('server_wishlist')
333         self.last_wishlist = wishlist
334         for size in reversed(self.server_calculator.cloud_sizes):
335             try:
336                 nodes_wanted = self._nodes_wanted(size)
337                 if nodes_wanted > 0:
338                     self._later.start_node(size)
339                 elif (nodes_wanted < 0) and self.booting:
340                     self._later.stop_booting_node(size)
341             except Exception as e:
342                 self._logger.exception("while calculating nodes wanted for size %s", size)
343
344     def _check_poll_freshness(orig_func):
345         """Decorator to inhibit a method when poll information is stale.
346
347         This decorator checks the timestamps of all the poll information the
348         daemon has received.  The decorated method is only called if none
349         of the timestamps are considered stale.
350         """
351         @functools.wraps(orig_func)
352         def wrapper(self, *args, **kwargs):
353             now = time.time()
354             if all(now - t < self.poll_stale_after
355                    for t in self.last_polls.itervalues()):
356                 return orig_func(self, *args, **kwargs)
357             else:
358                 return None
359         return wrapper
360
361     @_check_poll_freshness
362     def start_node(self, cloud_size):
363         nodes_wanted = self._nodes_wanted(cloud_size)
364         if nodes_wanted < 1:
365             return None
366         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
367         self._logger.info("Want %i more %s nodes.  Booting a node.",
368                           nodes_wanted, cloud_size.name)
369         new_setup = self._node_setup.start(
370             timer_actor=self._timer,
371             arvados_client=self._new_arvados(),
372             arvados_node=arvados_node,
373             cloud_client=self._new_cloud(),
374             cloud_size=cloud_size).proxy()
375         self.booting[new_setup.actor_ref.actor_urn] = new_setup
376         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
377
378         if arvados_node is not None:
379             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
380                 time.time())
381         new_setup.subscribe(self._later.node_up)
382         if nodes_wanted > 1:
383             self._later.start_node(cloud_size)
384
385     def _get_actor_attrs(self, actor, *attr_names):
386         return pykka.get_all([getattr(actor, name) for name in attr_names])
387
388     def node_up(self, setup_proxy):
389         cloud_node = setup_proxy.cloud_node.get()
390         del self.booting[setup_proxy.actor_ref.actor_urn]
391         del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
392
393         setup_proxy.stop()
394         if cloud_node is not None:
395             record = self.cloud_nodes.get(cloud_node.id)
396             if record is None:
397                 record = self._new_node(cloud_node)
398                 self.booted[cloud_node.id] = record
399             self._timer.schedule(time.time() + self.boot_fail_after,
400                                  self._later.shutdown_unpaired_node, cloud_node.id)
401
402     @_check_poll_freshness
403     def stop_booting_node(self, size):
404         nodes_excess = self._nodes_excess(size)
405         if (nodes_excess < 1) or not self.booting:
406             return None
407         for key, node in self.booting.iteritems():
408             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
409                 del self.booting[key]
410                 del self.sizes_booting_shutdown[key]
411
412                 if nodes_excess > 1:
413                     self._later.stop_booting_node(size)
414                 break
415
416     def _begin_node_shutdown(self, node_actor, cancellable):
417         cloud_node_obj = node_actor.cloud_node.get()
418         cloud_node_id = cloud_node_obj.id
419         if cloud_node_id in self.shutdowns:
420             return None
421         shutdown = self._node_shutdown.start(
422             timer_actor=self._timer, cloud_client=self._new_cloud(),
423             arvados_client=self._new_arvados(),
424             node_monitor=node_actor.actor_ref, cancellable=cancellable)
425         self.shutdowns[cloud_node_id] = shutdown.proxy()
426         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
427         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
428
429     @_check_poll_freshness
430     def node_can_shutdown(self, node_actor):
431         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
432             self._begin_node_shutdown(node_actor, cancellable=True)
433
434     def shutdown_unpaired_node(self, cloud_node_id):
435         for record_dict in [self.cloud_nodes, self.booted]:
436             if cloud_node_id in record_dict:
437                 record = record_dict[cloud_node_id]
438                 break
439         else:
440             return None
441         if not record.actor.in_state('idle', 'busy').get():
442             self._begin_node_shutdown(record.actor, cancellable=False)
443
444     def node_finished_shutdown(self, shutdown_actor):
445         cloud_node, success, cancel_reason = self._get_actor_attrs(
446             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
447         shutdown_actor.stop()
448         cloud_node_id = cloud_node.id
449         if not success:
450             if cancel_reason == self._node_shutdown.NODE_BROKEN:
451                 self.cloud_nodes.blacklist(cloud_node_id)
452         elif cloud_node_id in self.booted:
453             self.booted.pop(cloud_node_id).actor.stop()
454         del self.shutdowns[cloud_node_id]
455         del self.sizes_booting_shutdown[cloud_node_id]
456
457     def shutdown(self):
458         self._logger.info("Shutting down after signal.")
459         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
460         setup_stops = {key: node.stop_if_no_cloud_node()
461                        for key, node in self.booting.iteritems()}
462         self.booting = {key: self.booting[key]
463                         for key in setup_stops if not setup_stops[key].get()}
464         self._later.await_shutdown()
465
466     def await_shutdown(self):
467         if self.booting:
468             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
469         else:
470             self.stop()