Merge branch 'master' into 4233-graph-job-stats
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
15
16 class ComputeNodeStateChangeBase(config.actor_class):
17     """Base class for actors that change a compute node's state.
18
19     This base class takes care of retrying changes and notifying
20     subscribers when the change is finished.
21     """
22     def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
23         super(ComputeNodeStateChangeBase, self).__init__()
24         self._later = self.actor_ref.proxy()
25         self._timer = timer_actor
26         self._logger = logging.getLogger(logger_name)
27         self.min_retry_wait = retry_wait
28         self.max_retry_wait = max_retry_wait
29         self.retry_wait = retry_wait
30         self.subscribers = set()
31
32     @staticmethod
33     def _retry(errors):
34         """Retry decorator for an actor method that makes remote requests.
35
36         Use this function to decorator an actor method, and pass in a
37         tuple of exceptions to catch.  This decorator will schedule
38         retries of that method with exponential backoff if the
39         original method raises any of the given errors.
40         """
41         def decorator(orig_func):
42             @functools.wraps(orig_func)
43             def wrapper(self, *args, **kwargs):
44                 start_time = time.time()
45                 try:
46                     orig_func(self, *args, **kwargs)
47                 except errors as error:
48                     self._logger.warning(
49                         "Client error: %s - waiting %s seconds",
50                         error, self.retry_wait)
51                     self._timer.schedule(start_time + self.retry_wait,
52                                          getattr(self._later,
53                                                  orig_func.__name__),
54                                          *args, **kwargs)
55                     self.retry_wait = min(self.retry_wait * 2,
56                                           self.max_retry_wait)
57                 else:
58                     self.retry_wait = self.min_retry_wait
59             return wrapper
60         return decorator
61
62     def _finished(self):
63         _notify_subscribers(self._later, self.subscribers)
64         self.subscribers = None
65
66     def subscribe(self, subscriber):
67         if self.subscribers is None:
68             try:
69                 subscriber(self._later)
70             except pykka.ActorDeadError:
71                 pass
72         else:
73             self.subscribers.add(subscriber)
74
75
76 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
77     """Actor to create and set up a cloud compute node.
78
79     This actor prepares an Arvados node record for a new compute node
80     (either creating one or cleaning one passed in), then boots the
81     actual compute node.  It notifies subscribers when the cloud node
82     is successfully created (the last step in the process for Node
83     Manager to handle).
84     """
85     def __init__(self, timer_actor, arvados_client, cloud_client,
86                  cloud_size, arvados_node=None,
87                  retry_wait=1, max_retry_wait=180):
88         super(ComputeNodeSetupActor, self).__init__(
89             'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
90         self._arvados = arvados_client
91         self._cloud = cloud_client
92         self.cloud_size = cloud_size
93         self.arvados_node = None
94         self.cloud_node = None
95         if arvados_node is None:
96             self._later.create_arvados_node()
97         else:
98             self._later.prepare_arvados_node(arvados_node)
99
100     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
101     def create_arvados_node(self):
102         self.arvados_node = self._arvados.nodes().create(body={}).execute()
103         self._later.create_cloud_node()
104
105     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
106     def prepare_arvados_node(self, node):
107         self.arvados_node = self._arvados.nodes().update(
108             uuid=node['uuid'],
109             body={'hostname': None,
110                   'ip_address': None,
111                   'slot_number': None,
112                   'first_ping_at': None,
113                   'last_ping_at': None,
114                   'info': {'ec2_instance_id': None,
115                            'last_action': "Prepared by Node Manager"}}
116             ).execute()
117         self._later.create_cloud_node()
118
119     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
120     def create_cloud_node(self):
121         self._logger.info("Creating cloud node with size %s.",
122                           self.cloud_size.name)
123         self.cloud_node = self._cloud.create_node(self.cloud_size,
124                                                   self.arvados_node)
125         self._logger.info("Cloud node %s created.", self.cloud_node.id)
126         self._finished()
127
128     def stop_if_no_cloud_node(self):
129         if self.cloud_node is None:
130             self.stop()
131
132
133 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
134     """Actor to shut down a compute node.
135
136     This actor simply destroys a cloud node, retrying as needed.
137     """
138     def __init__(self, timer_actor, cloud_client, node_monitor,
139                  retry_wait=1, max_retry_wait=180):
140         super(ComputeNodeShutdownActor, self).__init__(
141             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
142         self._cloud = cloud_client
143         self._monitor = node_monitor.proxy()
144         self.cloud_node = self._monitor.cloud_node.get()
145         self.success = None
146
147     def on_start(self):
148         self._later.shutdown_node()
149
150     def cancel_shutdown(self):
151         self.success = False
152         self._finished()
153
154     def _stop_if_window_closed(orig_func):
155         @functools.wraps(orig_func)
156         def wrapper(self, *args, **kwargs):
157             if not self._monitor.shutdown_eligible().get():
158                 self._logger.info(
159                     "Cloud node %s shutdown cancelled - no longer eligible.",
160                     self.cloud_node.id)
161                 self._later.cancel_shutdown()
162                 return None
163             else:
164                 return orig_func(self, *args, **kwargs)
165         return wrapper
166
167     @_stop_if_window_closed
168     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
169     def shutdown_node(self):
170         if self._cloud.destroy_node(self.cloud_node):
171             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
172             self.success = True
173             self._finished()
174         else:
175             # Force a retry.
176             raise cloud_types.LibcloudError("destroy_node failed")
177
178     # Make the decorator available to subclasses.
179     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
180
181
182 class ComputeNodeUpdateActor(config.actor_class):
183     """Actor to dispatch one-off cloud management requests.
184
185     This actor receives requests for small cloud updates, and
186     dispatches them to a real driver.  ComputeNodeMonitorActors use
187     this to perform maintenance tasks on themselves.  Having a
188     dedicated actor for this gives us the opportunity to control the
189     flow of requests; e.g., by backing off when errors occur.
190
191     This actor is most like a "traditional" Pykka actor: there's no
192     subscribing, but instead methods return real driver results.  If
193     you're interested in those results, you should get them from the
194     Future that the proxy method returns.  Be prepared to handle exceptions
195     from the cloud driver when you do.
196     """
197     def __init__(self, cloud_factory, max_retry_wait=180):
198         super(ComputeNodeUpdateActor, self).__init__()
199         self._cloud = cloud_factory()
200         self.max_retry_wait = max_retry_wait
201         self.error_streak = 0
202         self.next_request_time = time.time()
203
204     def _throttle_errors(orig_func):
205         @functools.wraps(orig_func)
206         def wrapper(self, *args, **kwargs):
207             throttle_time = self.next_request_time - time.time()
208             if throttle_time > 0:
209                 time.sleep(throttle_time)
210             self.next_request_time = time.time()
211             try:
212                 result = orig_func(self, *args, **kwargs)
213             except config.CLOUD_ERRORS:
214                 self.error_streak += 1
215                 self.next_request_time += min(2 ** self.error_streak,
216                                               self.max_retry_wait)
217                 raise
218             else:
219                 self.error_streak = 0
220                 return result
221         return wrapper
222
223     @_throttle_errors
224     def sync_node(self, cloud_node, arvados_node):
225         return self._cloud.sync_node(cloud_node, arvados_node)
226
227
228 class ComputeNodeMonitorActor(config.actor_class):
229     """Actor to manage a running compute node.
230
231     This actor gets updates about a compute node's cloud and Arvados records.
232     It uses this information to notify subscribers when the node is eligible
233     for shutdown.
234     """
235     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
236                  timer_actor, update_actor, arvados_node=None,
237                  poll_stale_after=600, node_stale_after=3600):
238         super(ComputeNodeMonitorActor, self).__init__()
239         self._later = self.actor_ref.proxy()
240         self._logger = logging.getLogger('arvnodeman.computenode')
241         self._last_log = None
242         self._shutdowns = shutdown_timer
243         self._timer = timer_actor
244         self._update = update_actor
245         self.cloud_node = cloud_node
246         self.cloud_node_start_time = cloud_node_start_time
247         self.poll_stale_after = poll_stale_after
248         self.node_stale_after = node_stale_after
249         self.subscribers = set()
250         self.arvados_node = None
251         self._later.update_arvados_node(arvados_node)
252         self.last_shutdown_opening = None
253         self._later.consider_shutdown()
254
255     def subscribe(self, subscriber):
256         self.subscribers.add(subscriber)
257
258     def _debug(self, msg, *args):
259         if msg == self._last_log:
260             return
261         self._last_log = msg
262         self._logger.debug(msg, *args)
263
264     def in_state(self, *states):
265         # Return a boolean to say whether or not our Arvados node record is in
266         # one of the given states.  If state information is not
267         # available--because this node has no Arvados record, the record is
268         # stale, or the record has no state information--return None.
269         if (self.arvados_node is None) or not timestamp_fresh(
270               arvados_node_mtime(self.arvados_node), self.node_stale_after):
271             return None
272         state = self.arvados_node['info'].get('slurm_state')
273         if not state:
274             return None
275         result = state in states
276         if state == 'idle':
277             result = result and not self.arvados_node['job_uuid']
278         return result
279
280     def shutdown_eligible(self):
281         if not self._shutdowns.window_open():
282             return False
283         elif self.arvados_node is None:
284             # If this is a new, unpaired node, it's eligible for
285             # shutdown--we figure there was an error during bootstrap.
286             return timestamp_fresh(self.cloud_node_start_time,
287                                    self.node_stale_after)
288         else:
289             return self.in_state('idle')
290
291     def consider_shutdown(self):
292         next_opening = self._shutdowns.next_opening()
293         if self.shutdown_eligible():
294             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
295             _notify_subscribers(self._later, self.subscribers)
296         elif self._shutdowns.window_open():
297             self._debug("Node %s shutdown window open but node busy.",
298                         self.cloud_node.id)
299         elif self.last_shutdown_opening != next_opening:
300             self._debug("Node %s shutdown window closed.  Next at %s.",
301                         self.cloud_node.id, time.ctime(next_opening))
302             self._timer.schedule(next_opening, self._later.consider_shutdown)
303             self.last_shutdown_opening = next_opening
304
305     def offer_arvados_pair(self, arvados_node):
306         if self.arvados_node is not None:
307             return None
308         elif arvados_node['ip_address'] in self.cloud_node.private_ips:
309             self._later.update_arvados_node(arvados_node)
310             return self.cloud_node.id
311         else:
312             return None
313
314     def update_cloud_node(self, cloud_node):
315         if cloud_node is not None:
316             self.cloud_node = cloud_node
317             self._later.consider_shutdown()
318
319     def update_arvados_node(self, arvados_node):
320         if arvados_node is not None:
321             self.arvados_node = arvados_node
322             new_hostname = arvados_node_fqdn(self.arvados_node)
323             if new_hostname != self.cloud_node.name:
324                 self._update.sync_node(self.cloud_node, self.arvados_node)
325             self._later.consider_shutdown()