Merge branch '3699-arv-copy'
[arvados.git] / services / nodemanager / arvnodeman / daemon.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import pykka
10
11 from . import computenode as cnode
12 from .config import actor_class
13
14 class _ComputeNodeRecord(object):
15     def __init__(self, actor=None, cloud_node=None, arvados_node=None,
16                  assignment_time=float('-inf')):
17         self.actor = actor
18         self.cloud_node = cloud_node
19         self.arvados_node = arvados_node
20         self.assignment_time = assignment_time
21
22
23 class _BaseNodeTracker(object):
24     def __init__(self):
25         self.nodes = {}
26         self.orphans = {}
27
28     def __getitem__(self, key):
29         return self.nodes[key]
30
31     def __len__(self):
32         return len(self.nodes)
33
34     def get(self, key, default=None):
35         return self.nodes.get(key, default)
36
37     def record_key(self, record):
38         return self.item_key(getattr(record, self.RECORD_ATTR))
39
40     def add(self, record):
41         self.nodes[self.record_key(record)] = record
42
43     def update_record(self, key, item):
44         setattr(self.nodes[key], self.RECORD_ATTR, item)
45
46     def update_from(self, response):
47         unseen = set(self.nodes.iterkeys())
48         for item in response:
49             key = self.item_key(item)
50             if key in unseen:
51                 unseen.remove(key)
52                 self.update_record(key, item)
53             else:
54                 yield key, item
55         self.orphans = {key: self.nodes.pop(key) for key in unseen}
56
57     def unpaired(self):
58         return (record for record in self.nodes.itervalues()
59                 if getattr(record, self.PAIR_ATTR) is None)
60
61
62 class _CloudNodeTracker(_BaseNodeTracker):
63     RECORD_ATTR = 'cloud_node'
64     PAIR_ATTR = 'arvados_node'
65     item_key = staticmethod(lambda cloud_node: cloud_node.id)
66
67
68 class _ArvadosNodeTracker(_BaseNodeTracker):
69     RECORD_ATTR = 'arvados_node'
70     PAIR_ATTR = 'cloud_node'
71     item_key = staticmethod(lambda arvados_node: arvados_node['uuid'])
72
73     def find_stale_node(self, stale_time):
74         for record in self.nodes.itervalues():
75             node = record.arvados_node
76             if (not cnode.timestamp_fresh(cnode.arvados_node_mtime(node),
77                                           stale_time) and
78                   not cnode.timestamp_fresh(record.assignment_time,
79                                             stale_time)):
80                 return node
81         return None
82
83
84 class NodeManagerDaemonActor(actor_class):
85     """Node Manager daemon.
86
87     This actor subscribes to all information polls about cloud nodes,
88     Arvados nodes, and the job queue.  It creates a ComputeNodeMonitorActor
89     for every cloud node, subscribing them to poll updates
90     appropriately.  It creates and destroys cloud nodes based on job queue
91     demand, and stops the corresponding ComputeNode actors when their work
92     is done.
93     """
94     def __init__(self, server_wishlist_actor, arvados_nodes_actor,
95                  cloud_nodes_actor, cloud_update_actor, timer_actor,
96                  arvados_factory, cloud_factory,
97                  shutdown_windows, max_nodes,
98                  poll_stale_after=600, node_stale_after=7200,
99                  node_setup_class=cnode.ComputeNodeSetupActor,
100                  node_shutdown_class=cnode.ComputeNodeShutdownActor,
101                  node_actor_class=cnode.ComputeNodeMonitorActor):
102         super(NodeManagerDaemonActor, self).__init__()
103         self._node_setup = node_setup_class
104         self._node_shutdown = node_shutdown_class
105         self._node_actor = node_actor_class
106         self._cloud_updater = cloud_update_actor
107         self._timer = timer_actor
108         self._new_arvados = arvados_factory
109         self._new_cloud = cloud_factory
110         self._cloud_driver = self._new_cloud()
111         self._logger = logging.getLogger('arvnodeman.daemon')
112         self._later = self.actor_ref.proxy()
113         self.shutdown_windows = shutdown_windows
114         self.max_nodes = max_nodes
115         self.poll_stale_after = poll_stale_after
116         self.node_stale_after = node_stale_after
117         self.last_polls = {}
118         for poll_name in ['server_wishlist', 'arvados_nodes', 'cloud_nodes']:
119             poll_actor = locals()[poll_name + '_actor']
120             poll_actor.subscribe(getattr(self._later, 'update_' + poll_name))
121             setattr(self, '_{}_actor'.format(poll_name), poll_actor)
122             self.last_polls[poll_name] = -self.poll_stale_after
123         self.cloud_nodes = _CloudNodeTracker()
124         self.arvados_nodes = _ArvadosNodeTracker()
125         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
126         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
127         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
128         self._logger.debug("Daemon initialized")
129
130     def _update_poll_time(self, poll_key):
131         self.last_polls[poll_key] = time.time()
132
133     def _pair_nodes(self, node_record, arvados_node):
134         self._logger.info("Cloud node %s has associated with Arvados node %s",
135                           node_record.cloud_node.id, arvados_node['uuid'])
136         self._arvados_nodes_actor.subscribe_to(
137             arvados_node['uuid'], node_record.actor.update_arvados_node)
138         node_record.arvados_node = arvados_node
139         self.arvados_nodes.add(node_record)
140
141     def _new_node(self, cloud_node):
142         start_time = self._cloud_driver.node_start_time(cloud_node)
143         shutdown_timer = cnode.ShutdownTimer(start_time,
144                                              self.shutdown_windows)
145         actor = self._node_actor.start(
146             cloud_node=cloud_node,
147             cloud_node_start_time=start_time,
148             shutdown_timer=shutdown_timer,
149             update_actor=self._cloud_updater,
150             timer_actor=self._timer,
151             arvados_node=None,
152             poll_stale_after=self.poll_stale_after,
153             node_stale_after=self.node_stale_after).proxy()
154         actor.subscribe(self._later.node_can_shutdown)
155         self._cloud_nodes_actor.subscribe_to(cloud_node.id,
156                                              actor.update_cloud_node)
157         record = _ComputeNodeRecord(actor, cloud_node)
158         return record
159
160     def update_cloud_nodes(self, nodelist):
161         self._update_poll_time('cloud_nodes')
162         for key, node in self.cloud_nodes.update_from(nodelist):
163             self._logger.info("Registering new cloud node %s", key)
164             if key in self.booted:
165                 record = self.booted.pop(key)
166             else:
167                 record = self._new_node(node)
168             self.cloud_nodes.add(record)
169             for arv_rec in self.arvados_nodes.unpaired():
170                 if record.actor.offer_arvados_pair(arv_rec.arvados_node).get():
171                     self._pair_nodes(record, arv_rec.arvados_node)
172                     break
173         for key, record in self.cloud_nodes.orphans.iteritems():
174             record.actor.stop()
175             self.shutdowns.pop(key, None)
176
177     def update_arvados_nodes(self, nodelist):
178         self._update_poll_time('arvados_nodes')
179         for key, node in self.arvados_nodes.update_from(nodelist):
180             self._logger.info("Registering new Arvados node %s", key)
181             record = _ComputeNodeRecord(arvados_node=node)
182             self.arvados_nodes.add(record)
183         for arv_rec in self.arvados_nodes.unpaired():
184             arv_node = arv_rec.arvados_node
185             for cloud_rec in self.cloud_nodes.unpaired():
186                 if cloud_rec.actor.offer_arvados_pair(arv_node).get():
187                     self._pair_nodes(cloud_rec, arv_node)
188                     break
189
190     def _node_count(self):
191         up = sum(len(nodelist) for nodelist in
192                  [self.cloud_nodes, self.booted, self.booting])
193         return up - len(self.shutdowns)
194
195     def _nodes_wanted(self):
196         return len(self.last_wishlist) - self._node_count()
197
198     def _nodes_excess(self):
199         return -self._nodes_wanted()
200
201     def update_server_wishlist(self, wishlist):
202         self._update_poll_time('server_wishlist')
203         self.last_wishlist = wishlist[:self.max_nodes]
204         nodes_wanted = self._nodes_wanted()
205         if nodes_wanted > 0:
206             self._later.start_node()
207         elif (nodes_wanted < 0) and self.booting:
208             self._later.stop_booting_node()
209
210     def _check_poll_freshness(orig_func):
211         """Decorator to inhibit a method when poll information is stale.
212
213         This decorator checks the timestamps of all the poll information the
214         daemon has received.  The decorated method is only called if none
215         of the timestamps are considered stale.
216         """
217         @functools.wraps(orig_func)
218         def wrapper(self, *args, **kwargs):
219             now = time.time()
220             if all(now - t < self.poll_stale_after
221                    for t in self.last_polls.itervalues()):
222                 return orig_func(self, *args, **kwargs)
223             else:
224                 return None
225         return wrapper
226
227     @_check_poll_freshness
228     def start_node(self):
229         nodes_wanted = self._nodes_wanted()
230         if nodes_wanted < 1:
231             return None
232         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
233         cloud_size = self.last_wishlist[nodes_wanted - 1]
234         self._logger.info("Want %s more nodes.  Booting a %s node.",
235                           nodes_wanted, cloud_size.name)
236         new_setup = self._node_setup.start(
237             timer_actor=self._timer,
238             arvados_client=self._new_arvados(),
239             arvados_node=arvados_node,
240             cloud_client=self._new_cloud(),
241             cloud_size=cloud_size).proxy()
242         self.booting[new_setup.actor_ref.actor_urn] = new_setup
243         if arvados_node is not None:
244             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
245                 time.time())
246         new_setup.subscribe(self._later.node_up)
247         if nodes_wanted > 1:
248             self._later.start_node()
249
250     def _actor_nodes(self, node_actor):
251         return pykka.get_all([node_actor.cloud_node, node_actor.arvados_node])
252
253     def node_up(self, setup_proxy):
254         cloud_node, arvados_node = self._actor_nodes(setup_proxy)
255         del self.booting[setup_proxy.actor_ref.actor_urn]
256         setup_proxy.stop()
257         record = self.cloud_nodes.get(cloud_node.id)
258         if record is None:
259             record = self._new_node(cloud_node)
260             self.booted[cloud_node.id] = record
261         self._pair_nodes(record, arvados_node)
262
263     @_check_poll_freshness
264     def stop_booting_node(self):
265         nodes_excess = self._nodes_excess()
266         if (nodes_excess < 1) or not self.booting:
267             return None
268         for key, node in self.booting.iteritems():
269             node.stop_if_no_cloud_node().get()
270             if not node.actor_ref.is_alive():
271                 del self.booting[key]
272                 if nodes_excess > 1:
273                     self._later.stop_booting_node()
274                 break
275
276     @_check_poll_freshness
277     def node_can_shutdown(self, node_actor):
278         if self._nodes_excess() < 1:
279             return None
280         cloud_node, arvados_node = self._actor_nodes(node_actor)
281         if cloud_node.id in self.shutdowns:
282             return None
283         shutdown = self._node_shutdown.start(timer_actor=self._timer,
284                                              cloud_client=self._new_cloud(),
285                                              cloud_node=cloud_node).proxy()
286         self.shutdowns[cloud_node.id] = shutdown
287         shutdown.subscribe(self._later.node_finished_shutdown)
288
289     def node_finished_shutdown(self, shutdown_actor):
290         cloud_node_id = shutdown_actor.cloud_node.get().id
291         shutdown_actor.stop()
292         if cloud_node_id in self.booted:
293             self.booted.pop(cloud_node_id).actor.stop()
294             del self.shutdowns[cloud_node_id]
295
296     def shutdown(self):
297         self._logger.info("Shutting down after signal.")
298         self.poll_stale_after = -1  # Inhibit starting/stopping nodes
299         for bootnode in self.booting.itervalues():
300             bootnode.stop_if_no_cloud_node()
301         self._later.await_shutdown()
302
303     def await_shutdown(self):
304         if any(node.actor_ref.is_alive() for node in self.booting.itervalues()):
305             self._timer.schedule(time.time() + 1, self._later.await_shutdown)
306         else:
307             self.stop()