Merge branch 'master' into 7255-manifests-in-datamanager
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._logger = logging.getLogger('arvnodeman.daemon')
125         self._later = self.actor_ref.proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.max_total_price = max_total_price
132         self.poll_stale_after = poll_stale_after
133         self.boot_fail_after = boot_fail_after
134         self.node_stale_after = node_stale_after
135         self.last_polls = {}
136         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137             poll_actor = locals()[poll_name + '_actor']
138             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140             self.last_polls[poll_name] = -self.poll_stale_after
141         self.cloud_nodes = _CloudNodeTracker()
142         self.arvados_nodes = _ArvadosNodeTracker()
143         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
144         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
145         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
146         self._logger.debug("Daemon initialized")
147
148     def _update_poll_time(self, poll_key):
149         self.last_polls[poll_key] = time.time()
150
151     def _pair_nodes(self, node_record, arvados_node):
152         self._logger.info("Cloud node %s has associated with Arvados node %s",
153                           node_record.cloud_node.id, arvados_node['uuid'])
154         self._arvados_nodes_actor.subscribe_to(
155             arvados_node['uuid'], node_record.actor.update_arvados_node)
156         node_record.arvados_node = arvados_node
157         self.arvados_nodes.add(node_record)
158
159     def _new_node(self, cloud_node):
160         start_time = self._cloud_driver.node_start_time(cloud_node)
161         shutdown_timer = cnode.ShutdownTimer(start_time,
162                                              self.shutdown_windows)
163         actor = self._node_actor.start(
164             cloud_node=cloud_node,
165             cloud_node_start_time=start_time,
166             shutdown_timer=shutdown_timer,
167             cloud_fqdn_func=self._cloud_driver.node_fqdn,
168             update_actor=self._cloud_updater,
169             timer_actor=self._timer,
170             arvados_node=None,
171             poll_stale_after=self.poll_stale_after,
172             node_stale_after=self.node_stale_after,
173             cloud_client=self._cloud_driver,
174             boot_fail_after=self.boot_fail_after).proxy()
175         actor.subscribe(self._later.node_can_shutdown)
176         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177                                              actor.update_cloud_node)
178         record = _ComputeNodeRecord(actor, cloud_node)
179         return record
180
181     def update_cloud_nodes(self, nodelist):
182         self._update_poll_time('cloud_nodes')
183         for key, node in self.cloud_nodes.update_from(nodelist):
184             self._logger.info("Registering new cloud node %s", key)
185             if key in self.booted:
186                 record = self.booted.pop(key)
187             else:
188                 record = self._new_node(node)
189             self.cloud_nodes.add(record)
190             for arv_rec in self.arvados_nodes.unpaired():
191                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192                     self._pair_nodes(record, arv_rec.arvados_node)
193                     break
194         for key, record in self.cloud_nodes.orphans.iteritems():
195             if key in self.shutdowns:
196                 try:
197                     self.shutdowns[key].stop().get()
198                 except pykka.ActorDeadError:
199                     pass
200                 del self.shutdowns[key]
201             record.actor.stop()
202             record.cloud_node = None
203
204     def update_arvados_nodes(self, nodelist):
205         self._update_poll_time('arvados_nodes')
206         for key, node in self.arvados_nodes.update_from(nodelist):
207             self._logger.info("Registering new Arvados node %s", key)
208             record = _ComputeNodeRecord(arvados_node=node)
209             self.arvados_nodes.add(record)
210         for arv_rec in self.arvados_nodes.unpaired():
211             arv_node = arv_rec.arvados_node
212             for cloud_rec in self.cloud_nodes.unpaired():
213                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214                     self._pair_nodes(cloud_rec, arv_node)
215                     break
216
217     def _nodes_up(self, size):
218         up = 0
219         up += sum(1
220                   for c in self.booting.itervalues()
221                   if size is None or c.cloud_size.get().id == size.id)
222         up += sum(1
223                   for i in (self.booted, self.cloud_nodes.nodes)
224                   for c in i.itervalues()
225                   if size is None or c.cloud_node.size.id == size.id)
226         return up
227
228     def _total_price(self):
229         cost = 0
230         cost += sum(c.cloud_size.get().price
231                   for c in self.booting.itervalues())
232         cost += sum(c.cloud_node.size.price
233                     for i in (self.booted, self.cloud_nodes.nodes)
234                     for c in i.itervalues())
235         return cost
236
237     def _nodes_busy(self, size):
238         return sum(1 for busy in
239                    pykka.get_all(rec.actor.in_state('busy') for rec in
240                                  self.cloud_nodes.nodes.itervalues()
241                                  if rec.cloud_node.size.id == size.id)
242                    if busy)
243
244     def _nodes_missing(self, size):
245         return sum(1 for arv_node in
246                    pykka.get_all(rec.actor.arvados_node for rec in
247                                  self.cloud_nodes.nodes.itervalues()
248                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
249                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
250
251     def _size_wishlist(self, size):
252         return sum(1 for c in self.last_wishlist if c.id == size.id)
253
254     def _size_shutdowns(self, size):
255         return sum(1 for c in self.shutdowns.itervalues()
256                    if c.cloud_node.get().size.id == size.id)
257
258     def _nodes_wanted(self, size):
259         total_up_count = self._nodes_up(None)
260         under_min = self.min_nodes - total_up_count
261         over_max = total_up_count - self.max_nodes
262         total_price = self._total_price()
263
264         if over_max >= 0:
265             return -over_max
266         elif under_min > 0 and size.id == self.min_cloud_size.id:
267             return under_min
268
269         up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
270                                            self._nodes_busy(size) +
271                                            self._nodes_missing(size))
272
273         wanted = self._size_wishlist(size) - up_count
274         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
275             can_boot = int((self.max_total_price - total_price) / size.price)
276             if can_boot == 0:
277                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
278                                   size.name, size.price, self.max_total_price, total_price)
279             return can_boot
280         else:
281             return wanted
282
283     def _nodes_excess(self, size):
284         up_count = self._nodes_up(size) - self._size_shutdowns(size)
285         if size.id == self.min_cloud_size.id:
286             up_count -= self.min_nodes
287         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
288
289     def update_server_wishlist(self, wishlist):
290         self._update_poll_time('server_wishlist')
291         self.last_wishlist = wishlist
292         for sz in reversed(self.server_calculator.cloud_sizes):
293             size = sz.real
294             nodes_wanted = self._nodes_wanted(size)
295             if nodes_wanted > 0:
296                 self._later.start_node(size)
297             elif (nodes_wanted < 0) and self.booting:
298                 self._later.stop_booting_node(size)
299
300     def _check_poll_freshness(orig_func):
301         """Decorator to inhibit a method when poll information is stale.
302
303         This decorator checks the timestamps of all the poll information the
304         daemon has received.  The decorated method is only called if none
305         of the timestamps are considered stale.
306         """
307         @functools.wraps(orig_func)
308         def wrapper(self, *args, **kwargs):
309             now = time.time()
310             if all(now - t < self.poll_stale_after
311                    for t in self.last_polls.itervalues()):
312                 return orig_func(self, *args, **kwargs)
313             else:
314                 return None
315         return wrapper
316
317     @_check_poll_freshness
318     def start_node(self, cloud_size):
319         nodes_wanted = self._nodes_wanted(cloud_size)
320         if nodes_wanted < 1:
321             return None
322         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
323         self._logger.info("Want %s more nodes.  Booting a %s node.",
324                           nodes_wanted, cloud_size.name)
325         new_setup = self._node_setup.start(
326             timer_actor=self._timer,
327             arvados_client=self._new_arvados(),
328             arvados_node=arvados_node,
329             cloud_client=self._new_cloud(),
330             cloud_size=cloud_size).proxy()
331         self.booting[new_setup.actor_ref.actor_urn] = new_setup
332         if arvados_node is not None:
333             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
334                 time.time())
335         new_setup.subscribe(self._later.node_up)
336         if nodes_wanted > 1:
337             self._later.start_node(cloud_size)
338
339     def _get_actor_attrs(self, actor, *attr_names):
340         return pykka.get_all([getattr(actor, name) for name in attr_names])
341
342     def node_up(self, setup_proxy):
343         cloud_node = setup_proxy.cloud_node.get()
344         del self.booting[setup_proxy.actor_ref.actor_urn]
345         setup_proxy.stop()
346         record = self.cloud_nodes.get(cloud_node.id)
347         if record is None:
348             record = self._new_node(cloud_node)
349             self.booted[cloud_node.id] = record
350         self._timer.schedule(time.time() + self.boot_fail_after,
351                              self._later.shutdown_unpaired_node, cloud_node.id)
352
353     @_check_poll_freshness
354     def stop_booting_node(self, size):
355         nodes_excess = self._nodes_excess(size)
356         if (nodes_excess < 1) or not self.booting:
357             return None
358         for key, node in self.booting.iteritems():
359             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
360                 del self.booting[key]
361                 if nodes_excess > 1:
362                     self._later.stop_booting_node(size)
363                 break
364
365     def _begin_node_shutdown(self, node_actor, cancellable):
366         cloud_node_id = node_actor.cloud_node.get().id
367         if cloud_node_id in self.shutdowns:
368             return None
369         shutdown = self._node_shutdown.start(
370             timer_actor=self._timer, cloud_client=self._new_cloud(),
371             arvados_client=self._new_arvados(),
372             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
373         self.shutdowns[cloud_node_id] = shutdown
374         shutdown.subscribe(self._later.node_finished_shutdown)
375
376     @_check_poll_freshness
377     def node_can_shutdown(self, node_actor):
378         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
379             self._begin_node_shutdown(node_actor, cancellable=True)
380
381     def shutdown_unpaired_node(self, cloud_node_id):
382         for record_dict in [self.cloud_nodes, self.booted]:
383             if cloud_node_id in record_dict:
384                 record = record_dict[cloud_node_id]
385                 break
386         else:
387             return None
388         if not record.actor.in_state('idle', 'busy').get():
389             self._begin_node_shutdown(record.actor, cancellable=False)
390
391     def node_finished_shutdown(self, shutdown_actor):
392         cloud_node, success, cancel_reason = self._get_actor_attrs(
393             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
394         shutdown_actor.stop()
395         cloud_node_id = cloud_node.id
396         if not success:
397             if cancel_reason == self._node_shutdown.NODE_BROKEN:
398                 self.cloud_nodes.blacklist(cloud_node_id)
399             del self.shutdowns[cloud_node_id]
400         elif cloud_node_id in self.booted:
401             self.booted.pop(cloud_node_id).actor.stop()
402             del self.shutdowns[cloud_node_id]
403
404     def shutdown(self):
405         self._logger.info("Shutting down after signal.")
406         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
407         setup_stops = {key: node.stop_if_no_cloud_node()
408                        for key, node in self.booting.iteritems()}
409         self.booting = {key: self.booting[key]
410                         for key in setup_stops if not setup_stops[key].get()}
411         self._later.await_shutdown()
412
413     def await_shutdown(self):
414         if self.booting:
415             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
416         else:
417             self.stop()