Merge branch 'master' into 4358-graph-not-comparing
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
15
16 class ComputeNodeStateChangeBase(config.actor_class):
17     """Base class for actors that change a compute node's state.
18
19     This base class takes care of retrying changes and notifying
20     subscribers when the change is finished.
21     """
22     def __init__(self, logger_name, cloud_client, timer_actor,
23                  retry_wait, max_retry_wait):
24         super(ComputeNodeStateChangeBase, self).__init__()
25         self._later = self.actor_ref.proxy()
26         self._logger = logging.getLogger(logger_name)
27         self._cloud = cloud_client
28         self._timer = timer_actor
29         self.min_retry_wait = retry_wait
30         self.max_retry_wait = max_retry_wait
31         self.retry_wait = retry_wait
32         self.subscribers = set()
33
34     @staticmethod
35     def _retry(errors=()):
36         """Retry decorator for an actor method that makes remote requests.
37
38         Use this function to decorator an actor method, and pass in a
39         tuple of exceptions to catch.  This decorator will schedule
40         retries of that method with exponential backoff if the
41         original method raises a known cloud driver error, or any of the
42         given exception types.
43         """
44         def decorator(orig_func):
45             @functools.wraps(orig_func)
46             def retry_wrapper(self, *args, **kwargs):
47                 start_time = time.time()
48                 try:
49                     orig_func(self, *args, **kwargs)
50                 except Exception as error:
51                     if not (isinstance(error, errors) or
52                             self._cloud.is_cloud_exception(error)):
53                         raise
54                     self._logger.warning(
55                         "Client error: %s - waiting %s seconds",
56                         error, self.retry_wait)
57                     self._timer.schedule(start_time + self.retry_wait,
58                                          getattr(self._later,
59                                                  orig_func.__name__),
60                                          *args, **kwargs)
61                     self.retry_wait = min(self.retry_wait * 2,
62                                           self.max_retry_wait)
63                 else:
64                     self.retry_wait = self.min_retry_wait
65             return retry_wrapper
66         return decorator
67
68     def _finished(self):
69         _notify_subscribers(self._later, self.subscribers)
70         self.subscribers = None
71
72     def subscribe(self, subscriber):
73         if self.subscribers is None:
74             try:
75                 subscriber(self._later)
76             except pykka.ActorDeadError:
77                 pass
78         else:
79             self.subscribers.add(subscriber)
80
81
82 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
83     """Actor to create and set up a cloud compute node.
84
85     This actor prepares an Arvados node record for a new compute node
86     (either creating one or cleaning one passed in), then boots the
87     actual compute node.  It notifies subscribers when the cloud node
88     is successfully created (the last step in the process for Node
89     Manager to handle).
90     """
91     def __init__(self, timer_actor, arvados_client, cloud_client,
92                  cloud_size, arvados_node=None,
93                  retry_wait=1, max_retry_wait=180):
94         super(ComputeNodeSetupActor, self).__init__(
95             'arvnodeman.nodeup', cloud_client, timer_actor,
96             retry_wait, max_retry_wait)
97         self._arvados = arvados_client
98         self.cloud_size = cloud_size
99         self.arvados_node = None
100         self.cloud_node = None
101         if arvados_node is None:
102             self._later.create_arvados_node()
103         else:
104             self._later.prepare_arvados_node(arvados_node)
105
106     @ComputeNodeStateChangeBase._retry()
107     def create_arvados_node(self):
108         self.arvados_node = self._arvados.nodes().create(body={}).execute()
109         self._later.create_cloud_node()
110
111     @ComputeNodeStateChangeBase._retry()
112     def prepare_arvados_node(self, node):
113         self.arvados_node = self._arvados.nodes().update(
114             uuid=node['uuid'],
115             body={'hostname': None,
116                   'ip_address': None,
117                   'slot_number': None,
118                   'first_ping_at': None,
119                   'last_ping_at': None,
120                   'info': {'ec2_instance_id': None,
121                            'last_action': "Prepared by Node Manager"}}
122             ).execute()
123         self._later.create_cloud_node()
124
125     @ComputeNodeStateChangeBase._retry()
126     def create_cloud_node(self):
127         self._logger.info("Creating cloud node with size %s.",
128                           self.cloud_size.name)
129         self.cloud_node = self._cloud.create_node(self.cloud_size,
130                                                   self.arvados_node)
131         self._logger.info("Cloud node %s created.", self.cloud_node.id)
132         self._later.post_create()
133
134     @ComputeNodeStateChangeBase._retry()
135     def post_create(self):
136         self._cloud.post_create_node(self.cloud_node)
137         self._logger.info("%s post-create work done.", self.cloud_node.id)
138         self._finished()
139
140     def stop_if_no_cloud_node(self):
141         if self.cloud_node is None:
142             self.stop()
143
144
145 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
146     """Actor to shut down a compute node.
147
148     This actor simply destroys a cloud node, retrying as needed.
149     """
150     def __init__(self, timer_actor, cloud_client, node_monitor,
151                  cancellable=True, retry_wait=1, max_retry_wait=180):
152         # If a ShutdownActor is cancellable, it will ask the
153         # ComputeNodeMonitorActor if it's still eligible before taking each
154         # action, and stop the shutdown process if the node is no longer
155         # eligible.  Normal shutdowns based on job demand should be
156         # cancellable; shutdowns based on node misbehavior should not.
157         super(ComputeNodeShutdownActor, self).__init__(
158             'arvnodeman.nodedown', cloud_client, timer_actor,
159             retry_wait, max_retry_wait)
160         self._monitor = node_monitor.proxy()
161         self.cloud_node = self._monitor.cloud_node.get()
162         self.cancellable = cancellable
163         self.success = None
164
165     def on_start(self):
166         self._later.shutdown_node()
167
168     def cancel_shutdown(self):
169         self.success = False
170         self._finished()
171
172     def _stop_if_window_closed(orig_func):
173         @functools.wraps(orig_func)
174         def stop_wrapper(self, *args, **kwargs):
175             if (self.cancellable and
176                   (not self._monitor.shutdown_eligible().get())):
177                 self._logger.info(
178                     "Cloud node %s shutdown cancelled - no longer eligible.",
179                     self.cloud_node.id)
180                 self._later.cancel_shutdown()
181                 return None
182             else:
183                 return orig_func(self, *args, **kwargs)
184         return stop_wrapper
185
186     @_stop_if_window_closed
187     @ComputeNodeStateChangeBase._retry()
188     def shutdown_node(self):
189         if self._cloud.destroy_node(self.cloud_node):
190             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
191             self.success = True
192             self._finished()
193         else:
194             # Force a retry.
195             raise cloud_types.LibcloudError("destroy_node failed")
196
197     # Make the decorator available to subclasses.
198     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
199
200
201 class ComputeNodeUpdateActor(config.actor_class):
202     """Actor to dispatch one-off cloud management requests.
203
204     This actor receives requests for small cloud updates, and
205     dispatches them to a real driver.  ComputeNodeMonitorActors use
206     this to perform maintenance tasks on themselves.  Having a
207     dedicated actor for this gives us the opportunity to control the
208     flow of requests; e.g., by backing off when errors occur.
209
210     This actor is most like a "traditional" Pykka actor: there's no
211     subscribing, but instead methods return real driver results.  If
212     you're interested in those results, you should get them from the
213     Future that the proxy method returns.  Be prepared to handle exceptions
214     from the cloud driver when you do.
215     """
216     def __init__(self, cloud_factory, max_retry_wait=180):
217         super(ComputeNodeUpdateActor, self).__init__()
218         self._cloud = cloud_factory()
219         self.max_retry_wait = max_retry_wait
220         self.error_streak = 0
221         self.next_request_time = time.time()
222
223     def _throttle_errors(orig_func):
224         @functools.wraps(orig_func)
225         def throttle_wrapper(self, *args, **kwargs):
226             throttle_time = self.next_request_time - time.time()
227             if throttle_time > 0:
228                 time.sleep(throttle_time)
229             self.next_request_time = time.time()
230             try:
231                 result = orig_func(self, *args, **kwargs)
232             except Exception as error:
233                 self.error_streak += 1
234                 self.next_request_time += min(2 ** self.error_streak,
235                                               self.max_retry_wait)
236                 raise
237             else:
238                 self.error_streak = 0
239                 return result
240         return throttle_wrapper
241
242     @_throttle_errors
243     def sync_node(self, cloud_node, arvados_node):
244         return self._cloud.sync_node(cloud_node, arvados_node)
245
246
247 class ComputeNodeMonitorActor(config.actor_class):
248     """Actor to manage a running compute node.
249
250     This actor gets updates about a compute node's cloud and Arvados records.
251     It uses this information to notify subscribers when the node is eligible
252     for shutdown.
253     """
254     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
255                  timer_actor, update_actor, arvados_node=None,
256                  poll_stale_after=600, node_stale_after=3600):
257         super(ComputeNodeMonitorActor, self).__init__()
258         self._later = self.actor_ref.proxy()
259         self._logger = logging.getLogger('arvnodeman.computenode')
260         self._last_log = None
261         self._shutdowns = shutdown_timer
262         self._timer = timer_actor
263         self._update = update_actor
264         self.cloud_node = cloud_node
265         self.cloud_node_start_time = cloud_node_start_time
266         self.poll_stale_after = poll_stale_after
267         self.node_stale_after = node_stale_after
268         self.subscribers = set()
269         self.arvados_node = None
270         self._later.update_arvados_node(arvados_node)
271         self.last_shutdown_opening = None
272         self._later.consider_shutdown()
273
274     def subscribe(self, subscriber):
275         self.subscribers.add(subscriber)
276
277     def _debug(self, msg, *args):
278         if msg == self._last_log:
279             return
280         self._last_log = msg
281         self._logger.debug(msg, *args)
282
283     def in_state(self, *states):
284         # Return a boolean to say whether or not our Arvados node record is in
285         # one of the given states.  If state information is not
286         # available--because this node has no Arvados record, the record is
287         # stale, or the record has no state information--return None.
288         if (self.arvados_node is None) or not timestamp_fresh(
289               arvados_node_mtime(self.arvados_node), self.node_stale_after):
290             return None
291         state = self.arvados_node['info'].get('slurm_state')
292         if not state:
293             return None
294         result = state in states
295         if state == 'idle':
296             result = result and not self.arvados_node['job_uuid']
297         return result
298
299     def shutdown_eligible(self):
300         if not self._shutdowns.window_open():
301             return False
302         elif self.arvados_node is None:
303             # If this is a new, unpaired node, it's eligible for
304             # shutdown--we figure there was an error during bootstrap.
305             return timestamp_fresh(self.cloud_node_start_time,
306                                    self.node_stale_after)
307         else:
308             return self.in_state('idle')
309
310     def consider_shutdown(self):
311         next_opening = self._shutdowns.next_opening()
312         if self.shutdown_eligible():
313             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
314             _notify_subscribers(self._later, self.subscribers)
315         elif self._shutdowns.window_open():
316             self._debug("Node %s shutdown window open but node busy.",
317                         self.cloud_node.id)
318         elif self.last_shutdown_opening != next_opening:
319             self._debug("Node %s shutdown window closed.  Next at %s.",
320                         self.cloud_node.id, time.ctime(next_opening))
321             self._timer.schedule(next_opening, self._later.consider_shutdown)
322             self.last_shutdown_opening = next_opening
323
324     def offer_arvados_pair(self, arvados_node):
325         if self.arvados_node is not None:
326             return None
327         elif arvados_node['ip_address'] in self.cloud_node.private_ips:
328             self._later.update_arvados_node(arvados_node)
329             return self.cloud_node.id
330         else:
331             return None
332
333     def update_cloud_node(self, cloud_node):
334         if cloud_node is not None:
335             self.cloud_node = cloud_node
336             self._later.consider_shutdown()
337
338     def update_arvados_node(self, arvados_node):
339         if arvados_node is not None:
340             self.arvados_node = arvados_node
341             new_hostname = arvados_node_fqdn(self.arvados_node)
342             if new_hostname != self.cloud_node.name:
343                 self._update.sync_node(self.cloud_node, self.arvados_node)
344             self._later.consider_shutdown()