Merge branch 'master' into 7490-datamanager-dont-die-return-error
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._logger = logging.getLogger('arvnodeman.daemon')
125         self._later = self.actor_ref.proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.max_total_price = max_total_price
132         self.poll_stale_after = poll_stale_after
133         self.boot_fail_after = boot_fail_after
134         self.node_stale_after = node_stale_after
135         self.last_polls = {}
136         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137             poll_actor = locals()[poll_name + '_actor']
138             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140             self.last_polls[poll_name] = -self.poll_stale_after
141         self.cloud_nodes = _CloudNodeTracker()
142         self.arvados_nodes = _ArvadosNodeTracker()
143         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
144         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
145         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
146         self._logger.debug("Daemon initialized")
147
148     def _update_poll_time(self, poll_key):
149         self.last_polls[poll_key] = time.time()
150
151     def _pair_nodes(self, node_record, arvados_node):
152         self._logger.info("Cloud node %s has associated with Arvados node %s",
153                           node_record.cloud_node.id, arvados_node['uuid'])
154         self._arvados_nodes_actor.subscribe_to(
155             arvados_node['uuid'], node_record.actor.update_arvados_node)
156         node_record.arvados_node = arvados_node
157         self.arvados_nodes.add(node_record)
158
159     def _new_node(self, cloud_node):
160         start_time = self._cloud_driver.node_start_time(cloud_node)
161         shutdown_timer = cnode.ShutdownTimer(start_time,
162                                              self.shutdown_windows)
163         actor = self._node_actor.start(
164             cloud_node=cloud_node,
165             cloud_node_start_time=start_time,
166             shutdown_timer=shutdown_timer,
167             cloud_fqdn_func=self._cloud_driver.node_fqdn,
168             update_actor=self._cloud_updater,
169             timer_actor=self._timer,
170             arvados_node=None,
171             poll_stale_after=self.poll_stale_after,
172             node_stale_after=self.node_stale_after,
173             cloud_client=self._cloud_driver,
174             boot_fail_after=self.boot_fail_after).proxy()
175         actor.subscribe(self._later.node_can_shutdown)
176         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177                                              actor.update_cloud_node)
178         record = _ComputeNodeRecord(actor, cloud_node)
179         return record
180
181     def update_cloud_nodes(self, nodelist):
182         self._update_poll_time('cloud_nodes')
183         for key, node in self.cloud_nodes.update_from(nodelist):
184             self._logger.info("Registering new cloud node %s", key)
185             if key in self.booted:
186                 record = self.booted.pop(key)
187             else:
188                 record = self._new_node(node)
189             self.cloud_nodes.add(record)
190             for arv_rec in self.arvados_nodes.unpaired():
191                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192                     self._pair_nodes(record, arv_rec.arvados_node)
193                     break
194         for key, record in self.cloud_nodes.orphans.iteritems():
195             if key in self.shutdowns:
196                 try:
197                     self.shutdowns[key].stop().get()
198                 except pykka.ActorDeadError:
199                     pass
200                 del self.shutdowns[key]
201             record.actor.stop()
202             record.cloud_node = None
203
204     def update_arvados_nodes(self, nodelist):
205         self._update_poll_time('arvados_nodes')
206         for key, node in self.arvados_nodes.update_from(nodelist):
207             self._logger.info("Registering new Arvados node %s", key)
208             record = _ComputeNodeRecord(arvados_node=node)
209             self.arvados_nodes.add(record)
210         for arv_rec in self.arvados_nodes.unpaired():
211             arv_node = arv_rec.arvados_node
212             for cloud_rec in self.cloud_nodes.unpaired():
213                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214                     self._pair_nodes(cloud_rec, arv_node)
215                     break
216
217     def _nodes_up(self, size):
218         up = 0
219         up += sum(1
220                   for c in self.booting.itervalues()
221                   if size is None or c.cloud_size.get().id == size.id)
222         up += sum(1
223                   for i in (self.booted, self.cloud_nodes.nodes)
224                   for c in i.itervalues()
225                   if size is None or c.cloud_node.size.id == size.id)
226         return up
227
228     def _total_price(self):
229         cost = 0
230         cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
231                   for c in self.booting.itervalues())
232         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
233                     for i in (self.booted, self.cloud_nodes.nodes)
234                     for c in i.itervalues())
235         return cost
236
237     def _nodes_busy(self, size):
238         return sum(1 for busy in
239                    pykka.get_all(rec.actor.in_state('busy') for rec in
240                                  self.cloud_nodes.nodes.itervalues()
241                                  if rec.cloud_node.size.id == size.id)
242                    if busy)
243
244     def _nodes_missing(self, size):
245         return sum(1 for arv_node in
246                    pykka.get_all(rec.actor.arvados_node for rec in
247                                  self.cloud_nodes.nodes.itervalues()
248                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
249                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
250
251     def _size_wishlist(self, size):
252         return sum(1 for c in self.last_wishlist if c.id == size.id)
253
254     def _size_shutdowns(self, size):
255         sh = 0
256         for c in self.shutdowns.itervalues():
257             try:
258                 if c.cloud_node.get().size.id == size.id:
259                     sh += 1
260             except pykka.ActorDeadError:
261                 pass
262         return sh
263
264     def _nodes_wanted(self, size):
265         total_up_count = self._nodes_up(None)
266         under_min = self.min_nodes - total_up_count
267         over_max = total_up_count - self.max_nodes
268         total_price = self._total_price()
269
270         if over_max >= 0:
271             return -over_max
272         elif under_min > 0 and size.id == self.min_cloud_size.id:
273             return under_min
274
275         up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
276                                            self._nodes_busy(size) +
277                                            self._nodes_missing(size))
278
279         self._logger.debug("%s: idle nodes %i, wishlist size %i", size.name, up_count, self._size_wishlist(size))
280
281         wanted = self._size_wishlist(size) - up_count
282         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
283             can_boot = int((self.max_total_price - total_price) / size.price)
284             if can_boot == 0:
285                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
286                                   size.name, size.price, self.max_total_price, total_price)
287             return can_boot
288         else:
289             return wanted
290
291     def _nodes_excess(self, size):
292         up_count = self._nodes_up(size) - self._size_shutdowns(size)
293         if size.id == self.min_cloud_size.id:
294             up_count -= self.min_nodes
295         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
296
297     def update_server_wishlist(self, wishlist):
298         self._update_poll_time('server_wishlist')
299         self.last_wishlist = wishlist
300         for size in reversed(self.server_calculator.cloud_sizes):
301             nodes_wanted = self._nodes_wanted(size)
302             if nodes_wanted > 0:
303                 self._later.start_node(size)
304             elif (nodes_wanted < 0) and self.booting:
305                 self._later.stop_booting_node(size)
306
307     def _check_poll_freshness(orig_func):
308         """Decorator to inhibit a method when poll information is stale.
309
310         This decorator checks the timestamps of all the poll information the
311         daemon has received.  The decorated method is only called if none
312         of the timestamps are considered stale.
313         """
314         @functools.wraps(orig_func)
315         def wrapper(self, *args, **kwargs):
316             now = time.time()
317             if all(now - t < self.poll_stale_after
318                    for t in self.last_polls.itervalues()):
319                 return orig_func(self, *args, **kwargs)
320             else:
321                 return None
322         return wrapper
323
324     @_check_poll_freshness
325     def start_node(self, cloud_size):
326         nodes_wanted = self._nodes_wanted(cloud_size)
327         if nodes_wanted < 1:
328             return None
329         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
330         self._logger.info("Want %s more nodes.  Booting a %s node.",
331                           nodes_wanted, cloud_size.name)
332         new_setup = self._node_setup.start(
333             timer_actor=self._timer,
334             arvados_client=self._new_arvados(),
335             arvados_node=arvados_node,
336             cloud_client=self._new_cloud(),
337             cloud_size=cloud_size).proxy()
338         self.booting[new_setup.actor_ref.actor_urn] = new_setup
339         if arvados_node is not None:
340             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
341                 time.time())
342         new_setup.subscribe(self._later.node_up)
343         if nodes_wanted > 1:
344             self._later.start_node(cloud_size)
345
346     def _get_actor_attrs(self, actor, *attr_names):
347         return pykka.get_all([getattr(actor, name) for name in attr_names])
348
349     def node_up(self, setup_proxy):
350         cloud_node = setup_proxy.cloud_node.get()
351         del self.booting[setup_proxy.actor_ref.actor_urn]
352         setup_proxy.stop()
353         record = self.cloud_nodes.get(cloud_node.id)
354         if record is None:
355             record = self._new_node(cloud_node)
356             self.booted[cloud_node.id] = record
357         self._timer.schedule(time.time() + self.boot_fail_after,
358                              self._later.shutdown_unpaired_node, cloud_node.id)
359
360     @_check_poll_freshness
361     def stop_booting_node(self, size):
362         nodes_excess = self._nodes_excess(size)
363         if (nodes_excess < 1) or not self.booting:
364             return None
365         for key, node in self.booting.iteritems():
366             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
367                 del self.booting[key]
368                 if nodes_excess > 1:
369                     self._later.stop_booting_node(size)
370                 break
371
372     def _begin_node_shutdown(self, node_actor, cancellable):
373         cloud_node_id = node_actor.cloud_node.get().id
374         if cloud_node_id in self.shutdowns:
375             return None
376         shutdown = self._node_shutdown.start(
377             timer_actor=self._timer, cloud_client=self._new_cloud(),
378             arvados_client=self._new_arvados(),
379             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
380         self.shutdowns[cloud_node_id] = shutdown
381         shutdown.subscribe(self._later.node_finished_shutdown)
382
383     @_check_poll_freshness
384     def node_can_shutdown(self, node_actor):
385         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
386             self._begin_node_shutdown(node_actor, cancellable=True)
387
388     def shutdown_unpaired_node(self, cloud_node_id):
389         for record_dict in [self.cloud_nodes, self.booted]:
390             if cloud_node_id in record_dict:
391                 record = record_dict[cloud_node_id]
392                 break
393         else:
394             return None
395         if not record.actor.in_state('idle', 'busy').get():
396             self._begin_node_shutdown(record.actor, cancellable=False)
397
398     def node_finished_shutdown(self, shutdown_actor):
399         cloud_node, success, cancel_reason = self._get_actor_attrs(
400             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
401         shutdown_actor.stop()
402         cloud_node_id = cloud_node.id
403         if not success:
404             if cancel_reason == self._node_shutdown.NODE_BROKEN:
405                 self.cloud_nodes.blacklist(cloud_node_id)
406             del self.shutdowns[cloud_node_id]
407         elif cloud_node_id in self.booted:
408             self.booted.pop(cloud_node_id).actor.stop()
409             del self.shutdowns[cloud_node_id]
410
411     def shutdown(self):
412         self._logger.info("Shutting down after signal.")
413         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
414         setup_stops = {key: node.stop_if_no_cloud_node()
415                        for key, node in self.booting.iteritems()}
416         self.booting = {key: self.booting[key]
417                         for key in setup_stops if not setup_stops[key].get()}
418         self._later.await_shutdown()
419
420     def await_shutdown(self):
421         if self.booting:
422             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
423         else:
424             self.stop()