8799: Nodes with slurm_state are "down" are checked with sinfo and either reenabled...
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._later = self.actor_ref.tell_proxy()
125         self.shutdown_windows = shutdown_windows
126         self.server_calculator = server_calculator
127         self.min_cloud_size = self.server_calculator.cheapest_size()
128         self.min_nodes = min_nodes
129         self.max_nodes = max_nodes
130         self.max_total_price = max_total_price
131         self.poll_stale_after = poll_stale_after
132         self.boot_fail_after = boot_fail_after
133         self.node_stale_after = node_stale_after
134         self.last_polls = {}
135         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136             poll_actor = locals()[poll_name + '_actor']
137             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139             self.last_polls[poll_name] = -self.poll_stale_after
140         self.cloud_nodes = _CloudNodeTracker()
141         self.arvados_nodes = _ArvadosNodeTracker()
142         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
143         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
144         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
145         self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _resume_node(self, node_record):
155         node_record.actor.resume_node()
156
157     def _pair_nodes(self, node_record, arvados_node):
158         self._logger.info("Cloud node %s is now paired with Arvados node %s",
159                           node_record.cloud_node.name, arvados_node['uuid'])
160         self._arvados_nodes_actor.subscribe_to(
161             arvados_node['uuid'], node_record.actor.update_arvados_node)
162         node_record.arvados_node = arvados_node
163         self.arvados_nodes.add(node_record)
164
165     def _new_node(self, cloud_node):
166         start_time = self._cloud_driver.node_start_time(cloud_node)
167         shutdown_timer = cnode.ShutdownTimer(start_time,
168                                              self.shutdown_windows)
169         actor = self._node_actor.start(
170             cloud_node=cloud_node,
171             cloud_node_start_time=start_time,
172             shutdown_timer=shutdown_timer,
173             cloud_fqdn_func=self._cloud_driver.node_fqdn,
174             update_actor=self._cloud_updater,
175             timer_actor=self._timer,
176             arvados_node=None,
177             poll_stale_after=self.poll_stale_after,
178             node_stale_after=self.node_stale_after,
179             cloud_client=self._cloud_driver,
180             boot_fail_after=self.boot_fail_after)
181         actorTell = actor.tell_proxy()
182         actorTell.subscribe(self._later.node_can_shutdown)
183         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
184                                              actorTell.update_cloud_node)
185         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
186         return record
187
188     def update_cloud_nodes(self, nodelist):
189         self._update_poll_time('cloud_nodes')
190         for key, node in self.cloud_nodes.update_from(nodelist):
191             self._logger.info("Registering new cloud node %s", key)
192             if key in self.booted:
193                 record = self.booted.pop(key)
194             else:
195                 record = self._new_node(node)
196             self.cloud_nodes.add(record)
197             for arv_rec in self.arvados_nodes.unpaired():
198                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
199                     self._pair_nodes(record, arv_rec.arvados_node)
200                     break
201         for key, record in self.cloud_nodes.orphans.iteritems():
202             if key in self.shutdowns:
203                 try:
204                     self.shutdowns[key].stop().get()
205                 except pykka.ActorDeadError:
206                     pass
207                 del self.shutdowns[key]
208                 del self.sizes_booting_shutdown[key]
209             record.actor.stop()
210             record.cloud_node = None
211
212     def update_arvados_nodes(self, nodelist):
213         self._update_poll_time('arvados_nodes')
214         for key, node in self.arvados_nodes.update_from(nodelist):
215             self._logger.info("Registering new Arvados node %s", key)
216             record = _ComputeNodeRecord(arvados_node=node)
217             self.arvados_nodes.add(record)
218         for arv_rec in self.arvados_nodes.unpaired():
219             arv_node = arv_rec.arvados_node
220             for cloud_rec in self.cloud_nodes.unpaired():
221                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
222                     self._pair_nodes(cloud_rec, arv_node)
223                     break
224         for rec in self.arvados_nodes.nodes.itervalues():
225             if (rec.arvados_node["info"].get("slurm_state") == "down" and
226                 rec.cloud_node is not None and
227                 rec.cloud_node.id not in self.shutdowns):
228                 self._resume_node(rec)
229
230     def _nodes_booting(self, size):
231         s = sum(1
232                 for c in self.booting.iterkeys()
233                 if size is None or self.sizes_booting_shutdown[c].id == size.id)
234         s += sum(1
235                  for c in self.booted.itervalues()
236                  if size is None or c.cloud_node.size.id == size.id)
237         return s
238
239     def _nodes_unpaired(self, size):
240         return sum(1
241                    for c in self.cloud_nodes.unpaired()
242                    if size is None or c.cloud_node.size.id == size.id)
243
244     def _nodes_booted(self, size):
245         return sum(1
246                   for c in self.cloud_nodes.nodes.itervalues()
247                   if size is None or c.cloud_node.size.id == size.id)
248
249     def _nodes_down(self, size):
250         return sum(1 for down in
251                    pykka.get_all(rec.actor.in_state('down') for rec in
252                                  self.cloud_nodes.nodes.itervalues()
253                                  if size is None or rec.cloud_node.size.id == size.id)
254                    if down)
255
256     def _nodes_up(self, size):
257         up = (self._nodes_booting(size) + self._nodes_booted(size)) - self._nodes_down(size)
258         return up
259
260     def _total_price(self):
261         cost = 0
262         cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
263                   for c in self.booting.iterkeys())
264         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
265                     for i in (self.booted, self.cloud_nodes.nodes)
266                     for c in i.itervalues())
267         return cost
268
269     def _nodes_busy(self, size):
270         return sum(1 for busy in
271                    pykka.get_all(rec.actor.in_state('busy') for rec in
272                                  self.cloud_nodes.nodes.itervalues()
273                                  if rec.cloud_node.size.id == size.id)
274                    if busy)
275
276     def _nodes_missing(self, size):
277         return sum(1 for arv_node in
278                    pykka.get_all(rec.actor.arvados_node for rec in
279                                  self.cloud_nodes.nodes.itervalues()
280                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
281                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
282
283     def _size_wishlist(self, size):
284         return sum(1 for c in self.last_wishlist if c.id == size.id)
285
286     def _size_shutdowns(self, size):
287         sh = 0
288         for c in self.shutdowns.iterkeys():
289             try:
290                 if self.sizes_booting_shutdown[c].id == size.id:
291                     sh += 1
292             except pykka.ActorDeadError:
293                 pass
294         return sh
295
296     def _nodes_wanted(self, size):
297         total_up_count = self._nodes_up(None)
298         under_min = self.min_nodes - total_up_count
299         over_max = total_up_count - self.max_nodes
300         total_price = self._total_price()
301
302         if over_max >= 0:
303             return -over_max
304         elif under_min > 0 and size.id == self.min_cloud_size.id:
305             return under_min
306
307         booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
308         shutdown_count = self._size_shutdowns(size)
309         busy_count = self._nodes_busy(size)
310         up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
311
312         self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
313                           self._size_wishlist(size),
314                           up_count + busy_count,
315                           booting_count,
316                           up_count - booting_count,
317                           busy_count,
318                           shutdown_count)
319
320         wanted = self._size_wishlist(size) - up_count
321         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
322             can_boot = int((self.max_total_price - total_price) / size.price)
323             if can_boot == 0:
324                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
325                                   size.name, size.price, self.max_total_price, total_price)
326             return can_boot
327         else:
328             return wanted
329
330     def _nodes_excess(self, size):
331         up_count = self._nodes_up(size) - self._size_shutdowns(size)
332         if size.id == self.min_cloud_size.id:
333             up_count -= self.min_nodes
334         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
335
336     def update_server_wishlist(self, wishlist):
337         self._update_poll_time('server_wishlist')
338         self.last_wishlist = wishlist
339         for size in reversed(self.server_calculator.cloud_sizes):
340             try:
341                 nodes_wanted = self._nodes_wanted(size)
342                 if nodes_wanted > 0:
343                     self._later.start_node(size)
344                 elif (nodes_wanted < 0) and self.booting:
345                     self._later.stop_booting_node(size)
346             except Exception as e:
347                 self._logger.exception("while calculating nodes wanted for size %s", size)
348
349     def _check_poll_freshness(orig_func):
350         """Decorator to inhibit a method when poll information is stale.
351
352         This decorator checks the timestamps of all the poll information the
353         daemon has received.  The decorated method is only called if none
354         of the timestamps are considered stale.
355         """
356         @functools.wraps(orig_func)
357         def wrapper(self, *args, **kwargs):
358             now = time.time()
359             if all(now - t < self.poll_stale_after
360                    for t in self.last_polls.itervalues()):
361                 return orig_func(self, *args, **kwargs)
362             else:
363                 return None
364         return wrapper
365
366     @_check_poll_freshness
367     def start_node(self, cloud_size):
368         nodes_wanted = self._nodes_wanted(cloud_size)
369         if nodes_wanted < 1:
370             return None
371         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
372         self._logger.info("Want %i more %s nodes.  Booting a node.",
373                           nodes_wanted, cloud_size.name)
374         new_setup = self._node_setup.start(
375             timer_actor=self._timer,
376             arvados_client=self._new_arvados(),
377             arvados_node=arvados_node,
378             cloud_client=self._new_cloud(),
379             cloud_size=cloud_size).proxy()
380         self.booting[new_setup.actor_ref.actor_urn] = new_setup
381         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
382
383         if arvados_node is not None:
384             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
385                 time.time())
386         new_setup.subscribe(self._later.node_up)
387         if nodes_wanted > 1:
388             self._later.start_node(cloud_size)
389
390     def _get_actor_attrs(self, actor, *attr_names):
391         return pykka.get_all([getattr(actor, name) for name in attr_names])
392
393     def node_up(self, setup_proxy):
394         cloud_node = setup_proxy.cloud_node.get()
395         del self.booting[setup_proxy.actor_ref.actor_urn]
396         del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
397
398         setup_proxy.stop()
399         if cloud_node is not None:
400             record = self.cloud_nodes.get(cloud_node.id)
401             if record is None:
402                 record = self._new_node(cloud_node)
403                 self.booted[cloud_node.id] = record
404             self._timer.schedule(time.time() + self.boot_fail_after,
405                                  self._later.shutdown_unpaired_node, cloud_node.id)
406
407     @_check_poll_freshness
408     def stop_booting_node(self, size):
409         nodes_excess = self._nodes_excess(size)
410         if (nodes_excess < 1) or not self.booting:
411             return None
412         for key, node in self.booting.iteritems():
413             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
414                 del self.booting[key]
415                 del self.sizes_booting_shutdown[key]
416
417                 if nodes_excess > 1:
418                     self._later.stop_booting_node(size)
419                 break
420
421     def _begin_node_shutdown(self, node_actor, cancellable):
422         cloud_node_obj = node_actor.cloud_node.get()
423         cloud_node_id = cloud_node_obj.id
424         if cloud_node_id in self.shutdowns:
425             return None
426         shutdown = self._node_shutdown.start(
427             timer_actor=self._timer, cloud_client=self._new_cloud(),
428             arvados_client=self._new_arvados(),
429             node_monitor=node_actor.actor_ref, cancellable=cancellable)
430         self.shutdowns[cloud_node_id] = shutdown.proxy()
431         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
432         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
433
434     @_check_poll_freshness
435     def node_can_shutdown(self, node_actor):
436         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
437             self._begin_node_shutdown(node_actor, cancellable=True)
438
439     def shutdown_unpaired_node(self, cloud_node_id):
440         for record_dict in [self.cloud_nodes, self.booted]:
441             if cloud_node_id in record_dict:
442                 record = record_dict[cloud_node_id]
443                 break
444         else:
445             return None
446         if not record.actor.in_state('idle', 'busy').get():
447             self._begin_node_shutdown(record.actor, cancellable=False)
448
449     def node_finished_shutdown(self, shutdown_actor):
450         cloud_node, success, cancel_reason = self._get_actor_attrs(
451             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
452         shutdown_actor.stop()
453         cloud_node_id = cloud_node.id
454         if not success:
455             if cancel_reason == self._node_shutdown.NODE_BROKEN:
456                 self.cloud_nodes.blacklist(cloud_node_id)
457         elif cloud_node_id in self.booted:
458             self.booted.pop(cloud_node_id).actor.stop()
459         del self.shutdowns[cloud_node_id]
460         del self.sizes_booting_shutdown[cloud_node_id]
461
462     def shutdown(self):
463         self._logger.info("Shutting down after signal.")
464         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
465         setup_stops = {key: node.stop_if_no_cloud_node()
466                        for key, node in self.booting.iteritems()}
467         self.booting = {key: self.booting[key]
468                         for key in setup_stops if not setup_stops[key].get()}
469         self._later.await_shutdown()
470
471     def await_shutdown(self):
472         if self.booting:
473             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
474         else:
475             self.stop()