5353: Remove extra assertion because busywait does it for us.
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._logger = logging.getLogger('arvnodeman.daemon')
125         self._later = self.actor_ref.proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.max_total_price = max_total_price
132         self.poll_stale_after = poll_stale_after
133         self.boot_fail_after = boot_fail_after
134         self.node_stale_after = node_stale_after
135         self.last_polls = {}
136         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137             poll_actor = locals()[poll_name + '_actor']
138             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140             self.last_polls[poll_name] = -self.poll_stale_after
141         self.cloud_nodes = _CloudNodeTracker()
142         self.arvados_nodes = _ArvadosNodeTracker()
143         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
144         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
145         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
146         self._logger.debug("Daemon initialized")
147
148     def _update_poll_time(self, poll_key):
149         self.last_polls[poll_key] = time.time()
150
151     def _pair_nodes(self, node_record, arvados_node):
152         self._logger.info("Cloud node %s has associated with Arvados node %s",
153                           node_record.cloud_node.id, arvados_node['uuid'])
154         self._arvados_nodes_actor.subscribe_to(
155             arvados_node['uuid'], node_record.actor.update_arvados_node)
156         node_record.arvados_node = arvados_node
157         self.arvados_nodes.add(node_record)
158
159     def _new_node(self, cloud_node):
160         start_time = self._cloud_driver.node_start_time(cloud_node)
161         shutdown_timer = cnode.ShutdownTimer(start_time,
162                                              self.shutdown_windows)
163         actor = self._node_actor.start(
164             cloud_node=cloud_node,
165             cloud_node_start_time=start_time,
166             shutdown_timer=shutdown_timer,
167             cloud_fqdn_func=self._cloud_driver.node_fqdn,
168             update_actor=self._cloud_updater,
169             timer_actor=self._timer,
170             arvados_node=None,
171             poll_stale_after=self.poll_stale_after,
172             node_stale_after=self.node_stale_after,
173             cloud_client=self._cloud_driver,
174             boot_fail_after=self.boot_fail_after).proxy()
175         actor.subscribe(self._later.node_can_shutdown)
176         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177                                              actor.update_cloud_node)
178         record = _ComputeNodeRecord(actor, cloud_node)
179         return record
180
181     def update_cloud_nodes(self, nodelist):
182         self._update_poll_time('cloud_nodes')
183         for key, node in self.cloud_nodes.update_from(nodelist):
184             self._logger.info("Registering new cloud node %s", key)
185             if key in self.booted:
186                 record = self.booted.pop(key)
187             else:
188                 record = self._new_node(node)
189             self.cloud_nodes.add(record)
190             for arv_rec in self.arvados_nodes.unpaired():
191                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192                     self._pair_nodes(record, arv_rec.arvados_node)
193                     break
194         for key, record in self.cloud_nodes.orphans.iteritems():
195             if key in self.shutdowns:
196                 try:
197                     self.shutdowns[key].stop().get()
198                 except pykka.ActorDeadError:
199                     pass
200                 del self.shutdowns[key]
201             record.actor.stop()
202             record.cloud_node = None
203
204     def update_arvados_nodes(self, nodelist):
205         self._update_poll_time('arvados_nodes')
206         for key, node in self.arvados_nodes.update_from(nodelist):
207             self._logger.info("Registering new Arvados node %s", key)
208             record = _ComputeNodeRecord(arvados_node=node)
209             self.arvados_nodes.add(record)
210         for arv_rec in self.arvados_nodes.unpaired():
211             arv_node = arv_rec.arvados_node
212             for cloud_rec in self.cloud_nodes.unpaired():
213                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214                     self._pair_nodes(cloud_rec, arv_node)
215                     break
216
217     def _nodes_up(self, size):
218         up = 0
219         up += sum(1
220                   for c in self.booting.itervalues()
221                   if size is None or c.cloud_size.get().id == size.id)
222         up += sum(1
223                   for i in (self.booted, self.cloud_nodes.nodes)
224                   for c in i.itervalues()
225                   if size is None or (c.cloud_node.size and c.cloud_node.size.id == size.id))
226         return up
227
228     def _total_price(self):
229         cost = 0
230         cost += sum(c.cloud_size.get().price
231                   for c in self.booting.itervalues())
232         cost += sum(c.cloud_node.size.price
233                     for i in (self.booted, self.cloud_nodes.nodes)
234                     for c in i.itervalues()
235                     if c.cloud_node.size)
236         return cost
237
238     def _nodes_busy(self, size):
239         return sum(1 for busy in
240                    pykka.get_all(rec.actor.in_state('busy') for rec in
241                                  self.cloud_nodes.nodes.itervalues()
242                                  if (rec.cloud_node.size and rec.cloud_node.size.id == size.id))
243                    if busy)
244
245     def _nodes_missing(self, size):
246         return sum(1 for arv_node in
247                    pykka.get_all(rec.actor.arvados_node for rec in
248                                  self.cloud_nodes.nodes.itervalues()
249                                  if rec.cloud_node.size and rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
250                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
251
252     def _size_wishlist(self, size):
253         return sum(1 for c in self.last_wishlist if c.id == size.id)
254
255     def _size_shutdowns(self, size):
256         return sum(1 for c in self.shutdowns.itervalues()
257                    if c.cloud_node.get().size.id == size.id)
258
259     def _nodes_wanted(self, size):
260         total_up_count = self._nodes_up(None)
261         under_min = self.min_nodes - total_up_count
262         over_max = total_up_count - self.max_nodes
263         total_price = self._total_price()
264
265         if over_max >= 0:
266             return -over_max
267         elif under_min > 0 and size.id == self.min_cloud_size.id:
268             return under_min
269
270         up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
271                                            self._nodes_busy(size) +
272                                            self._nodes_missing(size))
273
274         wanted = self._size_wishlist(size) - up_count
275         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
276             can_boot = int((self.max_total_price - total_price) / size.price)
277             if can_boot == 0:
278                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
279                                   size.name, size.price, self.max_total_price, total_price)
280             return can_boot
281         else:
282             return wanted
283
284     def _nodes_excess(self, size):
285         up_count = self._nodes_up(size) - self._size_shutdowns(size)
286         if size.id == self.min_cloud_size.id:
287             up_count -= self.min_nodes
288         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
289
290     def update_server_wishlist(self, wishlist):
291         self._update_poll_time('server_wishlist')
292         self.last_wishlist = wishlist
293         for sz in reversed(self.server_calculator.cloud_sizes):
294             size = sz.real
295             nodes_wanted = self._nodes_wanted(size)
296             if nodes_wanted > 0:
297                 self._later.start_node(size)
298             elif (nodes_wanted < 0) and self.booting:
299                 self._later.stop_booting_node(size)
300
301     def _check_poll_freshness(orig_func):
302         """Decorator to inhibit a method when poll information is stale.
303
304         This decorator checks the timestamps of all the poll information the
305         daemon has received.  The decorated method is only called if none
306         of the timestamps are considered stale.
307         """
308         @functools.wraps(orig_func)
309         def wrapper(self, *args, **kwargs):
310             now = time.time()
311             if all(now - t < self.poll_stale_after
312                    for t in self.last_polls.itervalues()):
313                 return orig_func(self, *args, **kwargs)
314             else:
315                 return None
316         return wrapper
317
318     @_check_poll_freshness
319     def start_node(self, cloud_size):
320         nodes_wanted = self._nodes_wanted(cloud_size)
321         if nodes_wanted < 1:
322             return None
323         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
324         self._logger.info("Want %s more nodes.  Booting a %s node.",
325                           nodes_wanted, cloud_size.name)
326         new_setup = self._node_setup.start(
327             timer_actor=self._timer,
328             arvados_client=self._new_arvados(),
329             arvados_node=arvados_node,
330             cloud_client=self._new_cloud(),
331             cloud_size=cloud_size).proxy()
332         self.booting[new_setup.actor_ref.actor_urn] = new_setup
333         if arvados_node is not None:
334             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
335                 time.time())
336         new_setup.subscribe(self._later.node_up)
337         if nodes_wanted > 1:
338             self._later.start_node(cloud_size)
339
340     def _get_actor_attrs(self, actor, *attr_names):
341         return pykka.get_all([getattr(actor, name) for name in attr_names])
342
343     def node_up(self, setup_proxy):
344         cloud_node = setup_proxy.cloud_node.get()
345         del self.booting[setup_proxy.actor_ref.actor_urn]
346         setup_proxy.stop()
347         record = self.cloud_nodes.get(cloud_node.id)
348         if record is None:
349             record = self._new_node(cloud_node)
350             self.booted[cloud_node.id] = record
351         self._timer.schedule(time.time() + self.boot_fail_after,
352                              self._later.shutdown_unpaired_node, cloud_node.id)
353
354     @_check_poll_freshness
355     def stop_booting_node(self, size):
356         nodes_excess = self._nodes_excess(size)
357         if (nodes_excess < 1) or not self.booting:
358             return None
359         for key, node in self.booting.iteritems():
360             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
361                 del self.booting[key]
362                 if nodes_excess > 1:
363                     self._later.stop_booting_node(size)
364                 break
365
366     def _begin_node_shutdown(self, node_actor, cancellable):
367         cloud_node_id = node_actor.cloud_node.get().id
368         if cloud_node_id in self.shutdowns:
369             return None
370         shutdown = self._node_shutdown.start(
371             timer_actor=self._timer, cloud_client=self._new_cloud(),
372             arvados_client=self._new_arvados(),
373             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
374         self.shutdowns[cloud_node_id] = shutdown
375         shutdown.subscribe(self._later.node_finished_shutdown)
376
377     @_check_poll_freshness
378     def node_can_shutdown(self, node_actor):
379         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
380             self._begin_node_shutdown(node_actor, cancellable=True)
381
382     def shutdown_unpaired_node(self, cloud_node_id):
383         for record_dict in [self.cloud_nodes, self.booted]:
384             if cloud_node_id in record_dict:
385                 record = record_dict[cloud_node_id]
386                 break
387         else:
388             return None
389         if not record.actor.in_state('idle', 'busy').get():
390             self._begin_node_shutdown(record.actor, cancellable=False)
391
392     def node_finished_shutdown(self, shutdown_actor):
393         cloud_node, success, cancel_reason = self._get_actor_attrs(
394             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
395         shutdown_actor.stop()
396         cloud_node_id = cloud_node.id
397         if not success:
398             if cancel_reason == self._node_shutdown.NODE_BROKEN:
399                 self.cloud_nodes.blacklist(cloud_node_id)
400             del self.shutdowns[cloud_node_id]
401         elif cloud_node_id in self.booted:
402             self.booted.pop(cloud_node_id).actor.stop()
403             del self.shutdowns[cloud_node_id]
404
405     def shutdown(self):
406         self._logger.info("Shutting down after signal.")
407         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
408         setup_stops = {key: node.stop_if_no_cloud_node()
409                        for key, node in self.booting.iteritems()}
410         self.booting = {key: self.booting[key]
411                         for key in setup_stops if not setup_stops[key].get()}
412         self._later.await_shutdown()
413
414     def await_shutdown(self):
415         if self.booting:
416             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
417         else:
418             self.stop()