4380: Node Manager shutdown actor is more robust.
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
15
16 class ComputeNodeStateChangeBase(config.actor_class):
17     """Base class for actors that change a compute node's state.
18
19     This base class takes care of retrying changes and notifying
20     subscribers when the change is finished.
21     """
22     def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
23         super(ComputeNodeStateChangeBase, self).__init__()
24         self._later = self.actor_ref.proxy()
25         self._timer = timer_actor
26         self._logger = logging.getLogger(logger_name)
27         self.min_retry_wait = retry_wait
28         self.max_retry_wait = max_retry_wait
29         self.retry_wait = retry_wait
30         self.subscribers = set()
31
32     @staticmethod
33     def _retry(errors):
34         """Retry decorator for an actor method that makes remote requests.
35
36         Use this function to decorator an actor method, and pass in a
37         tuple of exceptions to catch.  This decorator will schedule
38         retries of that method with exponential backoff if the
39         original method raises any of the given errors.
40         """
41         def decorator(orig_func):
42             @functools.wraps(orig_func)
43             def wrapper(self, *args, **kwargs):
44                 try:
45                     orig_func(self, *args, **kwargs)
46                 except errors as error:
47                     self._logger.warning(
48                         "Client error: %s - waiting %s seconds",
49                         error, self.retry_wait)
50                     self._timer.schedule(self.retry_wait,
51                                          getattr(self._later,
52                                                  orig_func.__name__),
53                                          *args, **kwargs)
54                     self.retry_wait = min(self.retry_wait * 2,
55                                           self.max_retry_wait)
56                 else:
57                     self.retry_wait = self.min_retry_wait
58             return wrapper
59         return decorator
60
61     def _finished(self):
62         _notify_subscribers(self._later, self.subscribers)
63         self.subscribers = None
64
65     def subscribe(self, subscriber):
66         if self.subscribers is None:
67             try:
68                 subscriber(self._later)
69             except pykka.ActorDeadError:
70                 pass
71         else:
72             self.subscribers.add(subscriber)
73
74
75 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
76     """Actor to create and set up a cloud compute node.
77
78     This actor prepares an Arvados node record for a new compute node
79     (either creating one or cleaning one passed in), then boots the
80     actual compute node.  It notifies subscribers when the cloud node
81     is successfully created (the last step in the process for Node
82     Manager to handle).
83     """
84     def __init__(self, timer_actor, arvados_client, cloud_client,
85                  cloud_size, arvados_node=None,
86                  retry_wait=1, max_retry_wait=180):
87         super(ComputeNodeSetupActor, self).__init__(
88             'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
89         self._arvados = arvados_client
90         self._cloud = cloud_client
91         self.cloud_size = cloud_size
92         self.arvados_node = None
93         self.cloud_node = None
94         if arvados_node is None:
95             self._later.create_arvados_node()
96         else:
97             self._later.prepare_arvados_node(arvados_node)
98
99     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
100     def create_arvados_node(self):
101         self.arvados_node = self._arvados.nodes().create(body={}).execute()
102         self._later.create_cloud_node()
103
104     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
105     def prepare_arvados_node(self, node):
106         self.arvados_node = self._arvados.nodes().update(
107             uuid=node['uuid'],
108             body={'hostname': None,
109                   'ip_address': None,
110                   'slot_number': None,
111                   'first_ping_at': None,
112                   'last_ping_at': None,
113                   'info': {'ec2_instance_id': None,
114                            'last_action': "Prepared by Node Manager"}}
115             ).execute()
116         self._later.create_cloud_node()
117
118     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
119     def create_cloud_node(self):
120         self._logger.info("Creating cloud node with size %s.",
121                           self.cloud_size.name)
122         self.cloud_node = self._cloud.create_node(self.cloud_size,
123                                                   self.arvados_node)
124         self._logger.info("Cloud node %s created.", self.cloud_node.id)
125         self._finished()
126
127     def stop_if_no_cloud_node(self):
128         if self.cloud_node is None:
129             self.stop()
130
131
132 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
133     """Actor to shut down a compute node.
134
135     This actor simply destroys a cloud node, retrying as needed.
136     """
137     def __init__(self, timer_actor, cloud_client, node_monitor,
138                  retry_wait=1, max_retry_wait=180):
139         super(ComputeNodeShutdownActor, self).__init__(
140             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
141         self._cloud = cloud_client
142         self._monitor = node_monitor.proxy()
143         self.cloud_node = self._monitor.cloud_node.get()
144         self.success = None
145
146     def on_start(self):
147         self._later.shutdown_node()
148
149     def cancel_shutdown(self):
150         self.success = False
151         self._finished()
152
153     def _stop_if_window_closed(orig_func):
154         @functools.wraps(orig_func)
155         def wrapper(self, *args, **kwargs):
156             if not self._monitor.shutdown_eligible().get():
157                 self._logger.info(
158                     "Cloud node %s shutdown cancelled - no longer eligible.",
159                     self.cloud_node.id)
160                 self._later.cancel_shutdown()
161                 return None
162             else:
163                 return orig_func(self, *args, **kwargs)
164         return wrapper
165
166     @_stop_if_window_closed
167     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
168     def shutdown_node(self):
169         if self._cloud.destroy_node(self.cloud_node):
170             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
171             self.success = True
172             self._finished()
173         else:
174             # Force a retry.
175             raise cloud_types.LibcloudError("destroy_node failed")
176
177     # Make the decorator available to subclasses.
178     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
179
180
181 class ComputeNodeUpdateActor(config.actor_class):
182     """Actor to dispatch one-off cloud management requests.
183
184     This actor receives requests for small cloud updates, and
185     dispatches them to a real driver.  ComputeNodeMonitorActors use
186     this to perform maintenance tasks on themselves.  Having a
187     dedicated actor for this gives us the opportunity to control the
188     flow of requests; e.g., by backing off when errors occur.
189
190     This actor is most like a "traditional" Pykka actor: there's no
191     subscribing, but instead methods return real driver results.  If
192     you're interested in those results, you should get them from the
193     Future that the proxy method returns.  Be prepared to handle exceptions
194     from the cloud driver when you do.
195     """
196     def __init__(self, cloud_factory, max_retry_wait=180):
197         super(ComputeNodeUpdateActor, self).__init__()
198         self._cloud = cloud_factory()
199         self.max_retry_wait = max_retry_wait
200         self.error_streak = 0
201         self.next_request_time = time.time()
202
203     def _throttle_errors(orig_func):
204         @functools.wraps(orig_func)
205         def wrapper(self, *args, **kwargs):
206             throttle_time = self.next_request_time - time.time()
207             if throttle_time > 0:
208                 time.sleep(throttle_time)
209             self.next_request_time = time.time()
210             try:
211                 result = orig_func(self, *args, **kwargs)
212             except config.CLOUD_ERRORS:
213                 self.error_streak += 1
214                 self.next_request_time += min(2 ** self.error_streak,
215                                               self.max_retry_wait)
216                 raise
217             else:
218                 self.error_streak = 0
219                 return result
220         return wrapper
221
222     @_throttle_errors
223     def sync_node(self, cloud_node, arvados_node):
224         return self._cloud.sync_node(cloud_node, arvados_node)
225
226
227 class ComputeNodeMonitorActor(config.actor_class):
228     """Actor to manage a running compute node.
229
230     This actor gets updates about a compute node's cloud and Arvados records.
231     It uses this information to notify subscribers when the node is eligible
232     for shutdown.
233     """
234     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
235                  timer_actor, update_actor, arvados_node=None,
236                  poll_stale_after=600, node_stale_after=3600):
237         super(ComputeNodeMonitorActor, self).__init__()
238         self._later = self.actor_ref.proxy()
239         self._logger = logging.getLogger('arvnodeman.computenode')
240         self._last_log = None
241         self._shutdowns = shutdown_timer
242         self._timer = timer_actor
243         self._update = update_actor
244         self.cloud_node = cloud_node
245         self.cloud_node_start_time = cloud_node_start_time
246         self.poll_stale_after = poll_stale_after
247         self.node_stale_after = node_stale_after
248         self.subscribers = set()
249         self.arvados_node = None
250         self._later.update_arvados_node(arvados_node)
251         self.last_shutdown_opening = None
252         self._later.consider_shutdown()
253
254     def subscribe(self, subscriber):
255         self.subscribers.add(subscriber)
256
257     def _debug(self, msg, *args):
258         if msg == self._last_log:
259             return
260         self._last_log = msg
261         self._logger.debug(msg, *args)
262
263     def in_state(self, *states):
264         # Return a boolean to say whether or not our Arvados node record is in
265         # one of the given states.  If state information is not
266         # available--because this node has no Arvados record, the record is
267         # stale, or the record has no state information--return None.
268         if (self.arvados_node is None) or not timestamp_fresh(
269               arvados_node_mtime(self.arvados_node), self.node_stale_after):
270             return None
271         state = self.arvados_node['info'].get('slurm_state')
272         if not state:
273             return None
274         result = state in states
275         if state == 'idle':
276             result = result and not self.arvados_node['job_uuid']
277         return result
278
279     def shutdown_eligible(self):
280         if not self._shutdowns.window_open():
281             return False
282         elif self.arvados_node is None:
283             # If this is a new, unpaired node, it's eligible for
284             # shutdown--we figure there was an error during bootstrap.
285             return timestamp_fresh(self.cloud_node_start_time,
286                                    self.node_stale_after)
287         else:
288             return self.in_state('idle')
289
290     def consider_shutdown(self):
291         next_opening = self._shutdowns.next_opening()
292         if self.shutdown_eligible():
293             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
294             _notify_subscribers(self._later, self.subscribers)
295         elif self._shutdowns.window_open():
296             self._debug("Node %s shutdown window open but node busy.",
297                         self.cloud_node.id)
298         elif self.last_shutdown_opening != next_opening:
299             self._debug("Node %s shutdown window closed.  Next at %s.",
300                         self.cloud_node.id, time.ctime(next_opening))
301             self._timer.schedule(next_opening, self._later.consider_shutdown)
302             self.last_shutdown_opening = next_opening
303
304     def offer_arvados_pair(self, arvados_node):
305         if self.arvados_node is not None:
306             return None
307         elif arvados_node['ip_address'] in self.cloud_node.private_ips:
308             self._later.update_arvados_node(arvados_node)
309             return self.cloud_node.id
310         else:
311             return None
312
313     def update_cloud_node(self, cloud_node):
314         if cloud_node is not None:
315             self.cloud_node = cloud_node
316             self._later.consider_shutdown()
317
318     def update_arvados_node(self, arvados_node):
319         if arvados_node is not None:
320             self.arvados_node = arvados_node
321             new_hostname = arvados_node_fqdn(self.arvados_node)
322             if new_hostname != self.cloud_node.name:
323                 self._update.sync_node(self.cloud_node, self.arvados_node)
324             self._later.consider_shutdown()