Merge branch '8673-cwl-runner-project-uuid' closes #8673
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .computenode import dispatch
13 from .config import actor_class
14
15 class _ComputeNodeRecord(object):
16     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
17                  assignment_time=float('-inf')):
18         self.actor = actor
19         self.cloud_node = cloud_node
20         self.arvados_node = arvados_node
21         self.assignment_time = assignment_time
22
23
24 class _BaseNodeTracker(object):
25     def __init__(self):
26         self.nodes = {}
27         self.orphans = {}
28         self._blacklist = set()
29
30     # Proxy the methods listed below to self.nodes.
31     def _proxy_method(name):
32         method = getattr(dict, name)
33         @functools.wraps(method, ('__name__', '__doc__'))
34         def wrapper(self, *args, **kwargs):
35             return method(self.nodes, *args, **kwargs)
36         return wrapper
37
38     for _method_name in ['__contains__', '__getitem__', '__len__', 'get']:
39         locals()[_method_name] = _proxy_method(_method_name)
40
41     def record_key(self, record):
42         return self.item_key(getattr(record, self.RECORD_ATTR))
43
44     def add(self, record):
45         self.nodes[self.record_key(record)] = record
46
47     def blacklist(self, key):
48         self._blacklist.add(key)
49
50     def update_record(self, key, item):
51         setattr(self.nodes[key], self.RECORD_ATTR, item)
52
53     def update_from(self, response):
54         unseen = set(self.nodes.iterkeys())
55         for item in response:
56             key = self.item_key(item)
57             if key in self._blacklist:
58                 continue
59             elif key in unseen:
60                 unseen.remove(key)
61                 self.update_record(key, item)
62             else:
63                 yield key, item
64         self.orphans = {key: self.nodes.pop(key) for key in unseen}
65
66     def unpaired(self):
67         return (record for record in self.nodes.itervalues()
68                 if getattr(record, self.PAIR_ATTR) is None)
69
70
71 class _CloudNodeTracker(_BaseNodeTracker):
72     RECORD_ATTR = 'cloud_node'
73     PAIR_ATTR = 'arvados_node'
74     item_key = staticmethod(lambda cloud_node: cloud_node.id)
75
76
77 class _ArvadosNodeTracker(_BaseNodeTracker):
78     RECORD_ATTR = 'arvados_node'
79     PAIR_ATTR = 'cloud_node'
80     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
81
82     def find_stale_node(self, stale_time):
83         for record in self.nodes.itervalues():
84             node = record.arvados_node
85             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
86                                           stale_time) and
87                   not cnode.timestamp_fresh(record.assignment_time,
88                                             stale_time)):
89                 return node
90         return None
91
92
93 class NodeManagerDaemonActor(actor_class):
94     """Node Manager daemon.
95
96     This actor subscribes to all information polls about cloud nodes,
97     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
98     for every cloud node, subscribing them to poll updates
99     appropriately.  It creates and destroys cloud nodes based on job queue
100     demand, and stops the corresponding ComputeNode actors when their work
101     is done.
102     """
103     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
104                  cloud_nodes_actor, cloud_update_actor, timer_actor,
105                  arvados_factory, cloud_factory,
106                  shutdown_windows, server_calculator,
107                  min_nodes, max_nodes,
108                  poll_stale_after=600,
109                  boot_fail_after=1800,
110                  node_stale_after=7200,
111                  node_setup_class=dispatch.ComputeNodeSetupActor,
112                  node_shutdown_class=dispatch.ComputeNodeShutdownActor,
113                  node_actor_class=dispatch.ComputeNodeMonitorActor,
114                  max_total_price=0):
115         super(NodeManagerDaemonActor, self).__init__()
116         self._node_setup = node_setup_class
117         self._node_shutdown = node_shutdown_class
118         self._node_actor = node_actor_class
119         self._cloud_updater = cloud_update_actor
120         self._timer = timer_actor
121         self._new_arvados = arvados_factory
122         self._new_cloud = cloud_factory
123         self._cloud_driver = self._new_cloud()
124         self._later = self.actor_ref.tell_proxy()
125         self.shutdown_windows = shutdown_windows
126         self.server_calculator = server_calculator
127         self.min_cloud_size = self.server_calculator.cheapest_size()
128         self.min_nodes = min_nodes
129         self.max_nodes = max_nodes
130         self.max_total_price = max_total_price
131         self.poll_stale_after = poll_stale_after
132         self.boot_fail_after = boot_fail_after
133         self.node_stale_after = node_stale_after
134         self.last_polls = {}
135         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
136             poll_actor = locals()[poll_name + '_actor']
137             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
138             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
139             self.last_polls[poll_name] = -self.poll_stale_after
140         self.cloud_nodes = _CloudNodeTracker()
141         self.arvados_nodes = _ArvadosNodeTracker()
142         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
143         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
144         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
145         self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
146
147     def on_start(self):
148         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
149         self._logger.debug("Daemon started")
150
151     def _update_poll_time(self, poll_key):
152         self.last_polls[poll_key] = time.time()
153
154     def _pair_nodes(self, node_record, arvados_node):
155         self._logger.info("Cloud node %s is now paired with Arvados node %s",
156                           node_record.cloud_node.name, arvados_node['uuid'])
157         self._arvados_nodes_actor.subscribe_to(
158             arvados_node['uuid'], node_record.actor.update_arvados_node)
159         node_record.arvados_node = arvados_node
160         self.arvados_nodes.add(node_record)
161
162     def _new_node(self, cloud_node):
163         start_time = self._cloud_driver.node_start_time(cloud_node)
164         shutdown_timer = cnode.ShutdownTimer(start_time,
165                                              self.shutdown_windows)
166         actor = self._node_actor.start(
167             cloud_node=cloud_node,
168             cloud_node_start_time=start_time,
169             shutdown_timer=shutdown_timer,
170             cloud_fqdn_func=self._cloud_driver.node_fqdn,
171             update_actor=self._cloud_updater,
172             timer_actor=self._timer,
173             arvados_node=None,
174             poll_stale_after=self.poll_stale_after,
175             node_stale_after=self.node_stale_after,
176             cloud_client=self._cloud_driver,
177             boot_fail_after=self.boot_fail_after)
178         actorTell = actor.tell_proxy()
179         actorTell.subscribe(self._later.node_can_shutdown)
180         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
181                                              actorTell.update_cloud_node)
182         record = _ComputeNodeRecord(actor.proxy(), cloud_node)
183         return record
184
185     def update_cloud_nodes(self, nodelist):
186         self._update_poll_time('cloud_nodes')
187         for key, node in self.cloud_nodes.update_from(nodelist):
188             self._logger.info("Registering new cloud node %s", key)
189             if key in self.booted:
190                 record = self.booted.pop(key)
191             else:
192                 record = self._new_node(node)
193             self.cloud_nodes.add(record)
194             for arv_rec in self.arvados_nodes.unpaired():
195                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
196                     self._pair_nodes(record, arv_rec.arvados_node)
197                     break
198         for key, record in self.cloud_nodes.orphans.iteritems():
199             if key in self.shutdowns:
200                 try:
201                     self.shutdowns[key].stop().get()
202                 except pykka.ActorDeadError:
203                     pass
204                 del self.shutdowns[key]
205                 del self.sizes_booting_shutdown[key]
206             record.actor.stop()
207             record.cloud_node = None
208
209     def update_arvados_nodes(self, nodelist):
210         self._update_poll_time('arvados_nodes')
211         for key, node in self.arvados_nodes.update_from(nodelist):
212             self._logger.info("Registering new Arvados node %s", key)
213             record = _ComputeNodeRecord(arvados_node=node)
214             self.arvados_nodes.add(record)
215         for arv_rec in self.arvados_nodes.unpaired():
216             arv_node = arv_rec.arvados_node
217             for cloud_rec in self.cloud_nodes.unpaired():
218                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
219                     self._pair_nodes(cloud_rec, arv_node)
220                     break
221
222     def _nodes_booting(self, size):
223         s = sum(1
224                 for c in self.booting.iterkeys()
225                 if size is None or self.sizes_booting_shutdown[c].id == size.id)
226         s += sum(1
227                  for c in self.booted.itervalues()
228                  if size is None or c.cloud_node.size.id == size.id)
229         return s
230
231     def _nodes_unpaired(self, size):
232         return sum(1
233                    for c in self.cloud_nodes.unpaired()
234                    if size is None or c.cloud_node.size.id == size.id)
235
236     def _nodes_booted(self, size):
237         return sum(1
238                   for c in self.cloud_nodes.nodes.itervalues()
239                   if size is None or c.cloud_node.size.id == size.id)
240
241     def _nodes_up(self, size):
242         up = self._nodes_booting(size) + self._nodes_booted(size)
243         return up
244
245     def _total_price(self):
246         cost = 0
247         cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
248                   for c in self.booting.iterkeys())
249         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
250                     for i in (self.booted, self.cloud_nodes.nodes)
251                     for c in i.itervalues())
252         return cost
253
254     def _nodes_busy(self, size):
255         return sum(1 for busy in
256                    pykka.get_all(rec.actor.in_state('busy') for rec in
257                                  self.cloud_nodes.nodes.itervalues()
258                                  if rec.cloud_node.size.id == size.id)
259                    if busy)
260
261     def _nodes_missing(self, size):
262         return sum(1 for arv_node in
263                    pykka.get_all(rec.actor.arvados_node for rec in
264                                  self.cloud_nodes.nodes.itervalues()
265                                  if rec.cloud_node.size.id == size.id and rec.actor.cloud_node.get().id not in self.shutdowns)
266                    if arv_node and cnode.arvados_node_missing(arv_node, self.node_stale_after))
267
268     def _size_wishlist(self, size):
269         return sum(1 for c in self.last_wishlist if c.id == size.id)
270
271     def _size_shutdowns(self, size):
272         sh = 0
273         for c in self.shutdowns.iterkeys():
274             try:
275                 if self.sizes_booting_shutdown[c].id == size.id:
276                     sh += 1
277             except pykka.ActorDeadError:
278                 pass
279         return sh
280
281     def _nodes_wanted(self, size):
282         total_up_count = self._nodes_up(None)
283         under_min = self.min_nodes - total_up_count
284         over_max = total_up_count - self.max_nodes
285         total_price = self._total_price()
286
287         if over_max >= 0:
288             return -over_max
289         elif under_min > 0 and size.id == self.min_cloud_size.id:
290             return under_min
291
292         booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
293         shutdown_count = self._size_shutdowns(size)
294         busy_count = self._nodes_busy(size)
295         up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
296
297         self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
298                           self._size_wishlist(size),
299                           up_count + busy_count,
300                           booting_count,
301                           up_count - booting_count,
302                           busy_count,
303                           shutdown_count)
304
305         wanted = self._size_wishlist(size) - up_count
306         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
307             can_boot = int((self.max_total_price - total_price) / size.price)
308             if can_boot == 0:
309                 self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
310                                   size.name, size.price, self.max_total_price, total_price)
311             return can_boot
312         else:
313             return wanted
314
315     def _nodes_excess(self, size):
316         up_count = self._nodes_up(size) - self._size_shutdowns(size)
317         if size.id == self.min_cloud_size.id:
318             up_count -= self.min_nodes
319         return up_count - self._nodes_busy(size) - self._size_wishlist(size)
320
321     def update_server_wishlist(self, wishlist):
322         self._update_poll_time('server_wishlist')
323         self.last_wishlist = wishlist
324         for size in reversed(self.server_calculator.cloud_sizes):
325             try:
326                 nodes_wanted = self._nodes_wanted(size)
327                 if nodes_wanted > 0:
328                     self._later.start_node(size)
329                 elif (nodes_wanted < 0) and self.booting:
330                     self._later.stop_booting_node(size)
331             except Exception as e:
332                 self._logger.exception("while calculating nodes wanted for size %s", size)
333
334     def _check_poll_freshness(orig_func):
335         """Decorator to inhibit a method when poll information is stale.
336
337         This decorator checks the timestamps of all the poll information the
338         daemon has received.  The decorated method is only called if none
339         of the timestamps are considered stale.
340         """
341         @functools.wraps(orig_func)
342         def wrapper(self, *args, **kwargs):
343             now = time.time()
344             if all(now - t < self.poll_stale_after
345                    for t in self.last_polls.itervalues()):
346                 return orig_func(self, *args, **kwargs)
347             else:
348                 return None
349         return wrapper
350
351     @_check_poll_freshness
352     def start_node(self, cloud_size):
353         nodes_wanted = self._nodes_wanted(cloud_size)
354         if nodes_wanted < 1:
355             return None
356         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
357         self._logger.info("Want %i more %s nodes.  Booting a node.",
358                           nodes_wanted, cloud_size.name)
359         new_setup = self._node_setup.start(
360             timer_actor=self._timer,
361             arvados_client=self._new_arvados(),
362             arvados_node=arvados_node,
363             cloud_client=self._new_cloud(),
364             cloud_size=cloud_size).tell_proxy()
365         self.booting[new_setup.actor_ref.actor_urn] = new_setup
366         self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
367
368         if arvados_node is not None:
369             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
370                 time.time())
371         new_setup.subscribe(self._later.node_up)
372         if nodes_wanted > 1:
373             self._later.start_node(cloud_size)
374
375     def _get_actor_attrs(self, actor, *attr_names):
376         return pykka.get_all([getattr(actor, name) for name in attr_names])
377
378     def node_up(self, setup_proxy):
379         cloud_node = setup_proxy.cloud_node.get()
380         del self.booting[setup_proxy.actor_ref.actor_urn]
381         del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
382
383         setup_proxy.stop()
384         if cloud_node is not None:
385             record = self.cloud_nodes.get(cloud_node.id)
386             if record is None:
387                 record = self._new_node(cloud_node)
388                 self.booted[cloud_node.id] = record
389             self._timer.schedule(time.time() + self.boot_fail_after,
390                                  self._later.shutdown_unpaired_node, cloud_node.id)
391
392     @_check_poll_freshness
393     def stop_booting_node(self, size):
394         nodes_excess = self._nodes_excess(size)
395         if (nodes_excess < 1) or not self.booting:
396             return None
397         for key, node in self.booting.iteritems():
398             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
399                 del self.booting[key]
400                 del self.sizes_booting_shutdown[key]
401
402                 if nodes_excess > 1:
403                     self._later.stop_booting_node(size)
404                 break
405
406     def _begin_node_shutdown(self, node_actor, cancellable):
407         cloud_node_obj = node_actor.cloud_node.get()
408         cloud_node_id = cloud_node_obj.id
409         if cloud_node_id in self.shutdowns:
410             return None
411         shutdown = self._node_shutdown.start(
412             timer_actor=self._timer, cloud_client=self._new_cloud(),
413             arvados_client=self._new_arvados(),
414             node_monitor=node_actor.actor_ref, cancellable=cancellable)
415         self.shutdowns[cloud_node_id] = shutdown.proxy()
416         self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
417         shutdown.tell_proxy().subscribe(self._later.node_finished_shutdown)
418
419     @_check_poll_freshness
420     def node_can_shutdown(self, node_actor):
421         if self._nodes_excess(node_actor.cloud_node.get().size) > 0:
422             self._begin_node_shutdown(node_actor, cancellable=True)
423
424     def shutdown_unpaired_node(self, cloud_node_id):
425         for record_dict in [self.cloud_nodes, self.booted]:
426             if cloud_node_id in record_dict:
427                 record = record_dict[cloud_node_id]
428                 break
429         else:
430             return None
431         if not record.actor.in_state('idle', 'busy').get():
432             self._begin_node_shutdown(record.actor, cancellable=False)
433
434     def node_finished_shutdown(self, shutdown_actor):
435         cloud_node, success, cancel_reason = self._get_actor_attrs(
436             shutdown_actor, 'cloud_node', 'success', 'cancel_reason')
437         shutdown_actor.stop()
438         cloud_node_id = cloud_node.id
439         if not success:
440             if cancel_reason == self._node_shutdown.NODE_BROKEN:
441                 self.cloud_nodes.blacklist(cloud_node_id)
442         elif cloud_node_id in self.booted:
443             self.booted.pop(cloud_node_id).actor.stop()
444         del self.shutdowns[cloud_node_id]
445         del self.sizes_booting_shutdown[cloud_node_id]
446
447     def shutdown(self):
448         self._logger.info("Shutting down after signal.")
449         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
450         setup_stops = {key: node.stop_if_no_cloud_node()
451                        for key, node in self.booting.iteritems()}
452         self.booting = {key: self.booting[key]
453                         for key in setup_stops if not setup_stops[key].get()}
454         self._later.await_shutdown()
455
456     def await_shutdown(self):
457         if self.booting:
458             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
459         else:
460             self.stop()