Fix for _size_shutdowns and node prices in node manager refs #5353
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._logger = logging.getLogger('arvnodeman.daemon')
125         self._later = self.actor_ref.proxy()
126         self.shutdown_windows = shutdown_windows
127         self.server_calculator = server_calculator
128         self.min_cloud_size = self.server_calculator.cheapest_size()
129         self.min_nodes = min_nodes
130         self.max_nodes = max_nodes
131         self.max_total_price = max_total_price
132         self.poll_stale_after = poll_stale_after
133         self.boot_fail_after = boot_fail_after
134         self.node_stale_after = node_stale_after
135         self.last_polls = {}
136         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
137             poll_actor = locals()[poll_name + '_actor']
138             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
139             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
140             self.last_polls[poll_name] = -self.poll_stale_after
141         self.cloud_nodes = _CloudNodeTracker()
142         self.arvados_nodes = _ArvadosNodeTracker()
143         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
144         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
145         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
146         self._logger.debug("Daemon initialized")
147
148     def _update_poll_time(self, poll_key):
149         self.last_polls[poll_key] = time.time()
150
151     def _pair_nodes(self, node_record, arvados_node):
152         self._logger.info("Cloud node %s has associated with Arvados node %s",
153                           node_record.cloud_node.id, arvados_node['uuid'])
154         self._arvados_nodes_actor.subscribe_to(
155             arvados_node['uuid'], node_record.actor.update_arvados_node)
156         node_record.arvados_node = arvados_node
157         self.arvados_nodes.add(node_record)
158
159     def _new_node(self, cloud_node):
160         start_time = self._cloud_driver.node_start_time(cloud_node)
161         shutdown_timer = cnode.ShutdownTimer(start_time,
162                                              self.shutdown_windows)
163         actor = self._node_actor.start(
164             cloud_node=cloud_node,
165             cloud_node_start_time=start_time,
166             shutdown_timer=shutdown_timer,
167             cloud_fqdn_func=self._cloud_driver.node_fqdn,
168             update_actor=self._cloud_updater,
169             timer_actor=self._timer,
170             arvados_node=None,
171             poll_stale_after=self.poll_stale_after,
172             node_stale_after=self.node_stale_after,
173             cloud_client=self._cloud_driver,
174             boot_fail_after=self.boot_fail_after).proxy()
175         actor.subscribe(self._later.node_can_shutdown)
176         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
177                                              actor.update_cloud_node)
178         record = _ComputeNodeRecord(actor, cloud_node)
179         return record
180
181     def update_cloud_nodes(self, nodelist):
182         self._update_poll_time('cloud_nodes')
183         for key, node in self.cloud_nodes.update_from(nodelist):
184             self._logger.info("Registering new cloud node %s", key)
185             if key in self.booted:
186                 record = self.booted.pop(key)
187             else:
188                 record = self._new_node(node)
189             self.cloud_nodes.add(record)
190             for arv_rec in self.arvados_nodes.unpaired():
191                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
192                     self._pair_nodes(record, arv_rec.arvados_node)
193                     break
194         for key, record in self.cloud_nodes.orphans.iteritems():
195             if key in self.shutdowns:
196                 try:
197                     self.shutdowns[key].stop().get()
198                 except pykka.ActorDeadError:
199                     pass
200                 del self.shutdowns[key]
201             record.actor.stop()
202             record.cloud_node = None
203
204     def update_arvados_nodes(self, nodelist):
205         self._update_poll_time('arvados_nodes')
206         for key, node in self.arvados_nodes.update_from(nodelist):
207             self._logger.info("Registering new Arvados node %s", key)
208             record = _ComputeNodeRecord(arvados_node=node)
209             self.arvados_nodes.add(record)
210         for arv_rec in self.arvados_nodes.unpaired():
211             arv_node = arv_rec.arvados_node
212             for cloud_rec in self.cloud_nodes.unpaired():
213                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
214                     self._pair_nodes(cloud_rec, arv_node)
215                     break
216
217     def _nodes_up(self, size):
218         up = 0
219         up += sum(1
220                   for c in self.booting.itervalues()
221                   if size is None or c.cloud_size.get().id == size.id)
222         up += sum(1
223                   for i in (self.booted, self.cloud_nodes.nodes)
224                   for c in i.itervalues()
225                   if size is None or c.cloud_node.size.id == size.id)
226         return up
227
228     def _total_price(self):
229         cost = 0
230         cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
231                   for c in self.booting.itervalues())
232         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
233                     for i in (self.booted, self.cloud_nodes.nodes)
234                     for c in i.itervalues())
235         return cost
236
237     def _nodes_busy(self, size):
238         return sum(1 for busy in
239                    pykka.get_all(rec.actor.in_state('busy') for rec in
240                                  self.cloud_nodes.nodes.itervalues()
241                                  if rec.cloud_node.size.id == size.id)
242                    if busy)
243
244     def _nodes_missing(self, size):
245         return sum(1 for arv_node in
246                    pykka.get_all(rec.actor.arvados_node for rec in
247                                  self.cloud_nodes.nodes.itervalues()
248                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
249                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
250
251     def _size_wishlist(self, size):
252         return sum(1 for c in self.last_wishlist if c.id == size.id)
253
254     def _size_shutdowns(self, size):
255         sh = 0
256         for c in self.shutdowns.itervalues():
257             try:
258                 if c.cloud_node.get().size.id == size.id:
259                     sh += 1
260             except pykka.ActorDeadError:
261                 pass
262         return sh
263
264     def _nodes_wanted(self, size):
265         total_up_count = self._nodes_up(None)
266         under_min = self.min_nodes - total_up_count
267         over_max = total_up_count - self.max_nodes
268         total_price = self._total_price()
269
270         if over_max >= 0:
271             return -over_max
272         elif under_min > 0 and size.id == self.min_cloud_size.id:
273             return under_min
274
275         up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
276                                            self._nodes_busy(size) +
277                                            self._nodes_missing(size))
278
279         wanted = self._size_wishlist(size) - up_count
280         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
281             can_boot = int((self.max_total_price - total_price) / size.price)
282             if can_boot == 0:
283                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
284                                   size.name, size.price, self.max_total_price, total_price)
285             return can_boot
286         else:
287             return wanted
288
289     def _nodes_excess(self, size):
290         up_count = self._nodes_up(size) - self._size_shutdowns(size)
291         if size.id == self.min_cloud_size.id:
292             up_count -= self.min_nodes
293         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
294
295     def update_server_wishlist(self, wishlist):
296         self._update_poll_time('server_wishlist')
297         self.last_wishlist = wishlist
298         for size in reversed(self.server_calculator.cloud_sizes):
299             nodes_wanted = self._nodes_wanted(size)
300             if nodes_wanted > 0:
301                 self._later.start_node(size)
302             elif (nodes_wanted < 0) and self.booting:
303                 self._later.stop_booting_node(size)
304
305     def _check_poll_freshness(orig_func):
306         """Decorator to inhibit a method when poll information is stale.
307
308         This decorator checks the timestamps of all the poll information the
309         daemon has received.  The decorated method is only called if none
310         of the timestamps are considered stale.
311         """
312         @functools.wraps(orig_func)
313         def wrapper(self, *args, **kwargs):
314             now = time.time()
315             if all(now - t < self.poll_stale_after
316                    for t in self.last_polls.itervalues()):
317                 return orig_func(self, *args, **kwargs)
318             else:
319                 return None
320         return wrapper
321
322     @_check_poll_freshness
323     def start_node(self, cloud_size):
324         nodes_wanted = self._nodes_wanted(cloud_size)
325         if nodes_wanted < 1:
326             return None
327         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
328         self._logger.info("Want %s more nodes.  Booting a %s node.",
329                           nodes_wanted, cloud_size.name)
330         new_setup = self._node_setup.start(
331             timer_actor=self._timer,
332             arvados_client=self._new_arvados(),
333             arvados_node=arvados_node,
334             cloud_client=self._new_cloud(),
335             cloud_size=cloud_size).proxy()
336         self.booting[new_setup.actor_ref.actor_urn] = new_setup
337         if arvados_node is not None:
338             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
339                 time.time())
340         new_setup.subscribe(self._later.node_up)
341         if nodes_wanted > 1:
342             self._later.start_node(cloud_size)
343
344     def _get_actor_attrs(self, actor, *attr_names):
345         return pykka.get_all([getattr(actor, name) for name in attr_names])
346
347     def node_up(self, setup_proxy):
348         cloud_node = setup_proxy.cloud_node.get()
349         del self.booting[setup_proxy.actor_ref.actor_urn]
350         setup_proxy.stop()
351         record = self.cloud_nodes.get(cloud_node.id)
352         if record is None:
353             record = self._new_node(cloud_node)
354             self.booted[cloud_node.id] = record
355         self._timer.schedule(time.time() + self.boot_fail_after,
356                              self._later.shutdown_unpaired_node, cloud_node.id)
357
358     @_check_poll_freshness
359     def stop_booting_node(self, size):
360         nodes_excess = self._nodes_excess(size)
361         if (nodes_excess < 1) or not self.booting:
362             return None
363         for key, node in self.booting.iteritems():
364             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
365                 del self.booting[key]
366                 if nodes_excess > 1:
367                     self._later.stop_booting_node(size)
368                 break
369
370     def _begin_node_shutdown(self, node_actor, cancellable):
371         cloud_node_id = node_actor.cloud_node.get().id
372         if cloud_node_id in self.shutdowns:
373             return None
374         shutdown = self._node_shutdown.start(
375             timer_actor=self._timer, cloud_client=self._new_cloud(),
376             arvados_client=self._new_arvados(),
377             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
378         self.shutdowns[cloud_node_id] = shutdown
379         shutdown.subscribe(self._later.node_finished_shutdown)
380
381     @_check_poll_freshness
382     def node_can_shutdown(self, node_actor):
383         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
384             self._begin_node_shutdown(node_actor, cancellable=True)
385
386     def shutdown_unpaired_node(self, cloud_node_id):
387         for record_dict in [self.cloud_nodes, self.booted]:
388             if cloud_node_id in record_dict:
389                 record = record_dict[cloud_node_id]
390                 break
391         else:
392             return None
393         if not record.actor.in_state('idle', 'busy').get():
394             self._begin_node_shutdown(record.actor, cancellable=False)
395
396     def node_finished_shutdown(self, shutdown_actor):
397         cloud_node, success, cancel_reason = self._get_actor_attrs(
398             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
399         shutdown_actor.stop()
400         cloud_node_id = cloud_node.id
401         if not success:
402             if cancel_reason == self._node_shutdown.NODE_BROKEN:
403                 self.cloud_nodes.blacklist(cloud_node_id)
404             del self.shutdowns[cloud_node_id]
405         elif cloud_node_id in self.booted:
406             self.booted.pop(cloud_node_id).actor.stop()
407             del self.shutdowns[cloud_node_id]
408
409     def shutdown(self):
410         self._logger.info("Shutting down after signal.")
411         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
412         setup_stops = {key: node.stop_if_no_cloud_node()
413                        for key, node in self.booting.iteritems()}
414         self.booting = {key: self.booting[key]
415                         for key in setup_stops if not setup_stops[key].get()}
416         self._later.await_shutdown()
417
418     def await_shutdown(self):
419         if self.booting:
420             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
421         else:
422             self.stop()