7667: Fix log message
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._later = self.actor_ref.proxy()
125         self.shutdown_windows = shutdown_windows
126         self.server_calculator = server_calculator
127         self.min_cloud_size = self.server_calculator.cheapest_size()
128         self.min_nodes = min_nodes
129         self.max_nodes = max_nodes
130         self.max_total_price = max_total_price
131         self.poll_stale_after = poll_stale_after
132         self.boot_fail_after = boot_fail_after
133         self.node_stale_after = node_stale_after
134         self.last_polls = {}
135         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136             poll_actor = locals()[poll_name + '_actor']
137             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139             self.last_polls[poll_name] = -self.poll_stale_after
140         self.cloud_nodes = _CloudNodeTracker()
141         self.arvados_nodes = _ArvadosNodeTracker()
142         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
143         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
144         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
145
146     def on_start(self):
147         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
148         self._logger.debug("Daemon started")
149
150     def _update_poll_time(self, poll_key):
151         self.last_polls[poll_key] = time.time()
152
153     def _pair_nodes(self, node_record, arvados_node):
154         self._logger.info("Cloud node %s is now paired with Arvados node %s",
155                           node_record.cloud_node.name, arvados_node['uuid'])
156         self._arvados_nodes_actor.subscribe_to(
157             arvados_node['uuid'], node_record.actor.update_arvados_node)
158         node_record.arvados_node = arvados_node
159         self.arvados_nodes.add(node_record)
160
161     def _new_node(self, cloud_node):
162         start_time = self._cloud_driver.node_start_time(cloud_node)
163         shutdown_timer = cnode.ShutdownTimer(start_time,
164                                              self.shutdown_windows)
165         actor = self._node_actor.start(
166             cloud_node=cloud_node,
167             cloud_node_start_time=start_time,
168             shutdown_timer=shutdown_timer,
169             cloud_fqdn_func=self._cloud_driver.node_fqdn,
170             update_actor=self._cloud_updater,
171             timer_actor=self._timer,
172             arvados_node=None,
173             poll_stale_after=self.poll_stale_after,
174             node_stale_after=self.node_stale_after,
175             cloud_client=self._cloud_driver,
176             boot_fail_after=self.boot_fail_after).proxy()
177         actor.subscribe(self._later.node_can_shutdown)
178         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
179                                              actor.update_cloud_node)
180         record = _ComputeNodeRecord(actor, cloud_node)
181         return record
182
183     def update_cloud_nodes(self, nodelist):
184         self._update_poll_time('cloud_nodes')
185         for key, node in self.cloud_nodes.update_from(nodelist):
186             self._logger.info("Registering new cloud node %s", key)
187             if key in self.booted:
188                 record = self.booted.pop(key)
189             else:
190                 record = self._new_node(node)
191             self.cloud_nodes.add(record)
192             for arv_rec in self.arvados_nodes.unpaired():
193                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
194                     self._pair_nodes(record, arv_rec.arvados_node)
195                     break
196         for key, record in self.cloud_nodes.orphans.iteritems():
197             if key in self.shutdowns:
198                 try:
199                     self.shutdowns[key].stop().get()
200                 except pykka.ActorDeadError:
201                     pass
202                 del self.shutdowns[key]
203             record.actor.stop()
204             record.cloud_node = None
205
206     def update_arvados_nodes(self, nodelist):
207         self._update_poll_time('arvados_nodes')
208         for key, node in self.arvados_nodes.update_from(nodelist):
209             self._logger.info("Registering new Arvados node %s", key)
210             record = _ComputeNodeRecord(arvados_node=node)
211             self.arvados_nodes.add(record)
212         for arv_rec in self.arvados_nodes.unpaired():
213             arv_node = arv_rec.arvados_node
214             for cloud_rec in self.cloud_nodes.unpaired():
215                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
216                     self._pair_nodes(cloud_rec, arv_node)
217                     break
218
219     def _nodes_booting(self, size):
220         s = sum(1
221                 for c in self.booting.itervalues()
222                 if size is None or c.cloud_size.get().id == size.id)
223         s += sum(1
224                  for c in self.booted.itervalues()
225                  if size is None or c.cloud_node.size.id == size.id)
226         return s
227
228     def _nodes_unpaired(self, size):
229         return sum(1
230                    for c in self.cloud_nodes.unpaired()
231                    if size is None or c.cloud_node.size.id == size.id)
232
233     def _nodes_booted(self, size):
234         return sum(1
235                   for c in self.cloud_nodes.nodes.itervalues()
236                   if size is None or c.cloud_node.size.id == size.id)
237
238     def _nodes_up(self, size):
239         up = self._nodes_booting(size) + self._nodes_booted(size)
240         return up
241
242     def _total_price(self):
243         cost = 0
244         cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
245                   for c in self.booting.itervalues())
246         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
247                     for i in (self.booted, self.cloud_nodes.nodes)
248                     for c in i.itervalues())
249         return cost
250
251     def _nodes_busy(self, size):
252         return sum(1 for busy in
253                    pykka.get_all(rec.actor.in_state('busy') for rec in
254                                  self.cloud_nodes.nodes.itervalues()
255                                  if rec.cloud_node.size.id == size.id)
256                    if busy)
257
258     def _nodes_missing(self, size):
259         return sum(1 for arv_node in
260                    pykka.get_all(rec.actor.arvados_node for rec in
261                                  self.cloud_nodes.nodes.itervalues()
262                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
263                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
264
265     def _size_wishlist(self, size):
266         return sum(1 for c in self.last_wishlist if c.id == size.id)
267
268     def _size_shutdowns(self, size):
269         sh = 0
270         for c in self.shutdowns.itervalues():
271             try:
272                 if c.cloud_node.get().size.id == size.id:
273                     sh += 1
274             except pykka.ActorDeadError:
275                 pass
276         return sh
277
278     def _nodes_wanted(self, size):
279         total_up_count = self._nodes_up(None)
280         under_min = self.min_nodes - total_up_count
281         over_max = total_up_count - self.max_nodes
282         total_price = self._total_price()
283
284         if over_max >= 0:
285             return -over_max
286         elif under_min > 0 and size.id == self.min_cloud_size.id:
287             return under_min
288
289         booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
290         shutdown_count = self._size_shutdowns(size)
291         busy_count = self._nodes_busy(size)
292         up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
293
294         self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
295                           self._size_wishlist(size),
296                           up_count + busy_count,
297                           booting_count,
298                           up_count - booting_count,
299                           busy_count,
300                           shutdown_count)
301
302         wanted = self._size_wishlist(size) - up_count
303         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
304             can_boot = int((self.max_total_price - total_price) / size.price)
305             if can_boot == 0:
306                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
307                                   size.name, size.price, self.max_total_price, total_price)
308             return can_boot
309         else:
310             return wanted
311
312     def _nodes_excess(self, size):
313         up_count = self._nodes_up(size) - self._size_shutdowns(size)
314         if size.id == self.min_cloud_size.id:
315             up_count -= self.min_nodes
316         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
317
318     def update_server_wishlist(self, wishlist):
319         self._update_poll_time('server_wishlist')
320         self.last_wishlist = wishlist
321         for size in reversed(self.server_calculator.cloud_sizes):
322             try:
323                 nodes_wanted = self._nodes_wanted(size)
324                 if nodes_wanted > 0:
325                     self._later.start_node(size)
326                 elif (nodes_wanted < 0) and self.booting:
327                     self._later.stop_booting_node(size)
328             except Exception as e:
329                 self._logger.exception("while calculating nodes wanted for size %s", size)
330
331     def _check_poll_freshness(orig_func):
332         """Decorator to inhibit a method when poll information is stale.
333
334         This decorator checks the timestamps of all the poll information the
335         daemon has received.  The decorated method is only called if none
336         of the timestamps are considered stale.
337         """
338         @functools.wraps(orig_func)
339         def wrapper(self, *args, **kwargs):
340             now = time.time()
341             if all(now - t < self.poll_stale_after
342                    for t in self.last_polls.itervalues()):
343                 return orig_func(self, *args, **kwargs)
344             else:
345                 return None
346         return wrapper
347
348     @_check_poll_freshness
349     def start_node(self, cloud_size):
350         nodes_wanted = self._nodes_wanted(cloud_size)
351         if nodes_wanted < 1:
352             return None
353         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
354         self._logger.info("Want %i more %s nodes.  Booting a node.",
355                           nodes_wanted, cloud_size.name)
356         new_setup = self._node_setup.start(
357             timer_actor=self._timer,
358             arvados_client=self._new_arvados(),
359             arvados_node=arvados_node,
360             cloud_client=self._new_cloud(),
361             cloud_size=cloud_size).proxy()
362         self.booting[new_setup.actor_ref.actor_urn] = new_setup
363         if arvados_node is not None:
364             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
365                 time.time())
366         new_setup.subscribe(self._later.node_up)
367         if nodes_wanted > 1:
368             self._later.start_node(cloud_size)
369
370     def _get_actor_attrs(self, actor, *attr_names):
371         return pykka.get_all([getattr(actor, name) for name in attr_names])
372
373     def node_up(self, setup_proxy):
374         cloud_node = setup_proxy.cloud_node.get()
375         del self.booting[setup_proxy.actor_ref.actor_urn]
376         setup_proxy.stop()
377         if cloud_node is not None:
378             record = self.cloud_nodes.get(cloud_node.id)
379             if record is None:
380                 record = self._new_node(cloud_node)
381                 self.booted[cloud_node.id] = record
382             self._timer.schedule(time.time() + self.boot_fail_after,
383                                  self._later.shutdown_unpaired_node, cloud_node.id)
384
385     @_check_poll_freshness
386     def stop_booting_node(self, size):
387         nodes_excess = self._nodes_excess(size)
388         if (nodes_excess < 1) or not self.booting:
389             return None
390         for key, node in self.booting.iteritems():
391             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
392                 del self.booting[key]
393                 if nodes_excess > 1:
394                     self._later.stop_booting_node(size)
395                 break
396
397     def _begin_node_shutdown(self, node_actor, cancellable):
398         cloud_node_id = node_actor.cloud_node.get().id
399         if cloud_node_id in self.shutdowns:
400             return None
401         shutdown = self._node_shutdown.start(
402             timer_actor=self._timer, cloud_client=self._new_cloud(),
403             arvados_client=self._new_arvados(),
404             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
405         self.shutdowns[cloud_node_id] = shutdown
406         shutdown.subscribe(self._later.node_finished_shutdown)
407
408     @_check_poll_freshness
409     def node_can_shutdown(self, node_actor):
410         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
411             self._begin_node_shutdown(node_actor, cancellable=True)
412
413     def shutdown_unpaired_node(self, cloud_node_id):
414         for record_dict in [self.cloud_nodes, self.booted]:
415             if cloud_node_id in record_dict:
416                 record = record_dict[cloud_node_id]
417                 break
418         else:
419             return None
420         if not record.actor.in_state('idle', 'busy').get():
421             self._begin_node_shutdown(record.actor, cancellable=False)
422
423     def node_finished_shutdown(self, shutdown_actor):
424         cloud_node, success, cancel_reason = self._get_actor_attrs(
425             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
426         shutdown_actor.stop()
427         cloud_node_id = cloud_node.id
428         if not success:
429             if cancel_reason == self._node_shutdown.NODE_BROKEN:
430                 self.cloud_nodes.blacklist(cloud_node_id)
431             del self.shutdowns[cloud_node_id]
432         elif cloud_node_id in self.booted:
433             self.booted.pop(cloud_node_id).actor.stop()
434             del self.shutdowns[cloud_node_id]
435
436     def shutdown(self):
437         self._logger.info("Shutting down after signal.")
438         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
439         setup_stops = {key: node.stop_if_no_cloud_node()
440                        for key, node in self.booting.iteritems()}
441         self.booting = {key: self.booting[key]
442                         for key in setup_stops if not setup_stops[key].get()}
443         self._later.await_shutdown()
444
445     def await_shutdown(self):
446         if self.booting:
447             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
448         else:
449             self.stop()