Merge branch '4042-run-command-MxN' closes #4042
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import itertools
7 import logging
8 import time
9
10 import pykka
11
12 from ..clientactor import _notify_subscribers
13 from .. import config
14
15 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
16     hostname = arvados_node.get('hostname') or default_hostname
17     return '{}.{}'.format(hostname, arvados_node['domain'])
18
19 def arvados_node_mtime(node):
20     return time.mktime(time.strptime(node['modified_at'] + 'UTC',
21                                      '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone
22
23 def timestamp_fresh(timestamp, fresh_time):
24     return (time.time() - timestamp) < fresh_time
25
26 def _retry(errors):
27     """Retry decorator for an actor method that makes remote requests.
28
29     Use this function to decorator an actor method, and pass in a tuple of
30     exceptions to catch.  This decorator will schedule retries of that method
31     with exponential backoff if the original method raises any of the given
32     errors.
33     """
34     def decorator(orig_func):
35         @functools.wraps(orig_func)
36         def wrapper(self, *args, **kwargs):
37             try:
38                 orig_func(self, *args, **kwargs)
39             except errors as error:
40                 self._logger.warning(
41                     "Client error: %s - waiting %s seconds",
42                     error, self.retry_wait)
43                 self._timer.schedule(self.retry_wait,
44                                      getattr(self._later, orig_func.__name__),
45                                      *args, **kwargs)
46                 self.retry_wait = min(self.retry_wait * 2,
47                                       self.max_retry_wait)
48             else:
49                 self.retry_wait = self.min_retry_wait
50         return wrapper
51     return decorator
52
53 class BaseComputeNodeDriver(object):
54     """Abstract base class for compute node drivers.
55
56     libcloud abstracts away many of the differences between cloud providers,
57     but managing compute nodes requires some cloud-specific features (e.g.,
58     on EC2 we use tags to identify compute nodes).  Compute node drivers
59     are responsible for translating the node manager's cloud requests to a
60     specific cloud's vocabulary.
61
62     Subclasses must implement arvados_create_kwargs (to update node
63     creation kwargs with information about the specific Arvados node
64     record), sync_node, and node_start_time.
65     """
66     def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
67         self.real = driver_class(**auth_kwargs)
68         self.list_kwargs = list_kwargs
69         self.create_kwargs = create_kwargs
70
71     def __getattr__(self, name):
72         # Proxy non-extension methods to the real driver.
73         if (not name.startswith('_') and not name.startswith('ex_')
74               and hasattr(self.real, name)):
75             return getattr(self.real, name)
76         else:
77             return super(BaseComputeNodeDriver, self).__getattr__(name)
78
79     def search_for(self, term, list_method, key=lambda item: item.id):
80         cache_key = (list_method, term)
81         if cache_key not in self.SEARCH_CACHE:
82             results = [item for item in getattr(self.real, list_method)()
83                        if key(item) == term]
84             count = len(results)
85             if count != 1:
86                 raise ValueError("{} returned {} results for '{}'".format(
87                         list_method, count, term))
88             self.SEARCH_CACHE[cache_key] = results[0]
89         return self.SEARCH_CACHE[cache_key]
90
91     def list_nodes(self):
92         return self.real.list_nodes(**self.list_kwargs)
93
94     def arvados_create_kwargs(self, arvados_node):
95         raise NotImplementedError("BaseComputeNodeDriver.arvados_create_kwargs")
96
97     def create_node(self, size, arvados_node):
98         kwargs = self.create_kwargs.copy()
99         kwargs.update(self.arvados_create_kwargs(arvados_node))
100         kwargs['size'] = size
101         return self.real.create_node(**kwargs)
102
103     def sync_node(self, cloud_node, arvados_node):
104         # When a compute node first pings the API server, the API server
105         # will automatically assign some attributes on the corresponding
106         # node record, like hostname.  This method should propagate that
107         # information back to the cloud node appropriately.
108         raise NotImplementedError("BaseComputeNodeDriver.sync_node")
109
110     @classmethod
111     def node_start_time(cls, node):
112         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
113
114
115 ComputeNodeDriverClass = BaseComputeNodeDriver
116
117 class ComputeNodeSetupActor(config.actor_class):
118     """Actor to create and set up a cloud compute node.
119
120     This actor prepares an Arvados node record for a new compute node
121     (either creating one or cleaning one passed in), then boots the
122     actual compute node.  It notifies subscribers when the cloud node
123     is successfully created (the last step in the process for Node
124     Manager to handle).
125     """
126     def __init__(self, timer_actor, arvados_client, cloud_client,
127                  cloud_size, arvados_node=None,
128                  retry_wait=1, max_retry_wait=180):
129         super(ComputeNodeSetupActor, self).__init__()
130         self._timer = timer_actor
131         self._arvados = arvados_client
132         self._cloud = cloud_client
133         self._later = self.actor_ref.proxy()
134         self._logger = logging.getLogger('arvnodeman.nodeup')
135         self.cloud_size = cloud_size
136         self.subscribers = set()
137         self.min_retry_wait = retry_wait
138         self.max_retry_wait = max_retry_wait
139         self.retry_wait = retry_wait
140         self.arvados_node = None
141         self.cloud_node = None
142         if arvados_node is None:
143             self._later.create_arvados_node()
144         else:
145             self._later.prepare_arvados_node(arvados_node)
146
147     @_retry(config.ARVADOS_ERRORS)
148     def create_arvados_node(self):
149         self.arvados_node = self._arvados.nodes().create(body={}).execute()
150         self._later.create_cloud_node()
151
152     @_retry(config.ARVADOS_ERRORS)
153     def prepare_arvados_node(self, node):
154         self.arvados_node = self._arvados.nodes().update(
155             uuid=node['uuid'],
156             body={'hostname': None,
157                   'ip_address': None,
158                   'slot_number': None,
159                   'first_ping_at': None,
160                   'last_ping_at': None,
161                   'info': {'ec2_instance_id': None,
162                            'last_action': "Prepared by Node Manager"}}
163             ).execute()
164         self._later.create_cloud_node()
165
166     @_retry(config.CLOUD_ERRORS)
167     def create_cloud_node(self):
168         self._logger.info("Creating cloud node with size %s.",
169                           self.cloud_size.name)
170         self.cloud_node = self._cloud.create_node(self.cloud_size,
171                                                   self.arvados_node)
172         self._logger.info("Cloud node %s created.", self.cloud_node.id)
173         _notify_subscribers(self._later, self.subscribers)
174         self.subscribers = None
175
176     def stop_if_no_cloud_node(self):
177         if self.cloud_node is None:
178             self.stop()
179
180     def subscribe(self, subscriber):
181         if self.subscribers is None:
182             try:
183                 subscriber(self._later)
184             except pykka.ActorDeadError:
185                 pass
186         else:
187             self.subscribers.add(subscriber)
188
189
190 class ComputeNodeShutdownActor(config.actor_class):
191     """Actor to shut down a compute node.
192
193     This actor simply destroys a cloud node, retrying as needed.
194     """
195     def __init__(self, timer_actor, cloud_client, cloud_node,
196                  retry_wait=1, max_retry_wait=180):
197         super(ComputeNodeShutdownActor, self).__init__()
198         self._timer = timer_actor
199         self._cloud = cloud_client
200         self._later = self.actor_ref.proxy()
201         self._logger = logging.getLogger('arvnodeman.nodedown')
202         self.cloud_node = cloud_node
203         self.min_retry_wait = retry_wait
204         self.max_retry_wait = max_retry_wait
205         self.retry_wait = retry_wait
206         self._later.shutdown_node()
207
208     @_retry(config.CLOUD_ERRORS)
209     def shutdown_node(self):
210         self._cloud.destroy_node(self.cloud_node)
211         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
212
213
214 class ComputeNodeUpdateActor(config.actor_class):
215     """Actor to dispatch one-off cloud management requests.
216
217     This actor receives requests for small cloud updates, and
218     dispatches them to a real driver.  ComputeNodeMonitorActors use
219     this to perform maintenance tasks on themselves.  Having a
220     dedicated actor for this gives us the opportunity to control the
221     flow of requests; e.g., by backing off when errors occur.
222
223     This actor is most like a "traditional" Pykka actor: there's no
224     subscribing, but instead methods return real driver results.  If
225     you're interested in those results, you should get them from the
226     Future that the proxy method returns.  Be prepared to handle exceptions
227     from the cloud driver when you do.
228     """
229     def __init__(self, cloud_factory, max_retry_wait=180):
230         super(ComputeNodeUpdateActor, self).__init__()
231         self._cloud = cloud_factory()
232         self.max_retry_wait = max_retry_wait
233         self.error_streak = 0
234         self.next_request_time = time.time()
235
236     def _throttle_errors(orig_func):
237         @functools.wraps(orig_func)
238         def wrapper(self, *args, **kwargs):
239             throttle_time = self.next_request_time - time.time()
240             if throttle_time > 0:
241                 time.sleep(throttle_time)
242             self.next_request_time = time.time()
243             try:
244                 result = orig_func(self, *args, **kwargs)
245             except config.CLOUD_ERRORS:
246                 self.error_streak += 1
247                 self.next_request_time += min(2 ** self.error_streak,
248                                               self.max_retry_wait)
249                 raise
250             else:
251                 self.error_streak = 0
252                 return result
253         return wrapper
254
255     @_throttle_errors
256     def sync_node(self, cloud_node, arvados_node):
257         return self._cloud.sync_node(cloud_node, arvados_node)
258
259
260 class ShutdownTimer(object):
261     """Keep track of a cloud node's shutdown windows.
262
263     Instantiate this class with a timestamp of when a cloud node started,
264     and a list of durations (in minutes) of when the node must not and may
265     be shut down, alternating.  The class will tell you when a shutdown
266     window is open, and when the next open window will start.
267     """
268     def __init__(self, start_time, shutdown_windows):
269         # The implementation is easiest if we have an even number of windows,
270         # because then windows always alternate between open and closed.
271         # Rig that up: calculate the first shutdown window based on what's
272         # passed in.  Then, if we were given an odd number of windows, merge
273         # that first window into the last one, since they both# represent
274         # closed state.
275         first_window = shutdown_windows[0]
276         shutdown_windows = list(shutdown_windows[1:])
277         self._next_opening = start_time + (60 * first_window)
278         if len(shutdown_windows) % 2:
279             shutdown_windows.append(first_window)
280         else:
281             shutdown_windows[-1] += first_window
282         self.shutdown_windows = itertools.cycle([60 * n
283                                                  for n in shutdown_windows])
284         self._open_start = self._next_opening
285         self._open_for = next(self.shutdown_windows)
286
287     def _advance_opening(self):
288         while self._next_opening < time.time():
289             self._open_start = self._next_opening
290             self._next_opening += self._open_for + next(self.shutdown_windows)
291             self._open_for = next(self.shutdown_windows)
292
293     def next_opening(self):
294         self._advance_opening()
295         return self._next_opening
296
297     def window_open(self):
298         self._advance_opening()
299         return 0 < (time.time() - self._open_start) < self._open_for
300
301
302 class ComputeNodeMonitorActor(config.actor_class):
303     """Actor to manage a running compute node.
304
305     This actor gets updates about a compute node's cloud and Arvados records.
306     It uses this information to notify subscribers when the node is eligible
307     for shutdown.
308     """
309     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
310                  timer_actor, update_actor, arvados_node=None,
311                  poll_stale_after=600, node_stale_after=3600):
312         super(ComputeNodeMonitorActor, self).__init__()
313         self._later = self.actor_ref.proxy()
314         self._logger = logging.getLogger('arvnodeman.computenode')
315         self._last_log = None
316         self._shutdowns = shutdown_timer
317         self._timer = timer_actor
318         self._update = update_actor
319         self.cloud_node = cloud_node
320         self.cloud_node_start_time = cloud_node_start_time
321         self.poll_stale_after = poll_stale_after
322         self.node_stale_after = node_stale_after
323         self.subscribers = set()
324         self.arvados_node = None
325         self._later.update_arvados_node(arvados_node)
326         self.last_shutdown_opening = None
327         self._later.consider_shutdown()
328
329     def subscribe(self, subscriber):
330         self.subscribers.add(subscriber)
331
332     def _debug(self, msg, *args):
333         if msg == self._last_log:
334             return
335         self._last_log = msg
336         self._logger.debug(msg, *args)
337
338     def _shutdown_eligible(self):
339         if self.arvados_node is None:
340             return timestamp_fresh(self.cloud_node_start_time,
341                                    self.node_stale_after)
342         else:
343             return (timestamp_fresh(arvados_node_mtime(self.arvados_node),
344                                     self.poll_stale_after) and
345                     (self.arvados_node['info'].get('slurm_state') == 'idle'))
346
347     def consider_shutdown(self):
348         next_opening = self._shutdowns.next_opening()
349         if self._shutdowns.window_open():
350             if self._shutdown_eligible():
351                 self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
352                 _notify_subscribers(self._later, self.subscribers)
353             else:
354                 self._debug("Node %s shutdown window open but node busy.",
355                             self.cloud_node.id)
356         else:
357             self._debug("Node %s shutdown window closed.  Next at %s.",
358                         self.cloud_node.id, time.ctime(next_opening))
359         if self.last_shutdown_opening != next_opening:
360             self._timer.schedule(next_opening, self._later.consider_shutdown)
361             self.last_shutdown_opening = next_opening
362
363     def offer_arvados_pair(self, arvados_node):
364         if self.arvados_node is not None:
365             return None
366         elif arvados_node['ip_address'] in self.cloud_node.private_ips:
367             self._later.update_arvados_node(arvados_node)
368             return self.cloud_node.id
369         else:
370             return None
371
372     def update_cloud_node(self, cloud_node):
373         if cloud_node is not None:
374             self.cloud_node = cloud_node
375             self._later.consider_shutdown()
376
377     def update_arvados_node(self, arvados_node):
378         if arvados_node is not None:
379             self.arvados_node = arvados_node
380             new_hostname = arvados_node_fqdn(self.arvados_node)
381             if new_hostname != self.cloud_node.name:
382                 self._update.sync_node(self.cloud_node, self.arvados_node)
383             self._later.consider_shutdown()