Merge branch '4499-one-task-per-input-file-normalize'
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import arvados_node_fqdn, arvados_node_mtime, timestamp_fresh
13 from ...clientactor import _notify_subscribers
14 from ... import config
15
16 class ComputeNodeStateChangeBase(config.actor_class):
17     """Base class for actors that change a compute node's state.
18
19     This base class takes care of retrying changes and notifying
20     subscribers when the change is finished.
21     """
22     def __init__(self, logger_name, timer_actor, retry_wait, max_retry_wait):
23         super(ComputeNodeStateChangeBase, self).__init__()
24         self._later = self.actor_ref.proxy()
25         self._timer = timer_actor
26         self._logger = logging.getLogger(logger_name)
27         self.min_retry_wait = retry_wait
28         self.max_retry_wait = max_retry_wait
29         self.retry_wait = retry_wait
30         self.subscribers = set()
31
32     @staticmethod
33     def _retry(errors):
34         """Retry decorator for an actor method that makes remote requests.
35
36         Use this function to decorator an actor method, and pass in a
37         tuple of exceptions to catch.  This decorator will schedule
38         retries of that method with exponential backoff if the
39         original method raises any of the given errors.
40         """
41         def decorator(orig_func):
42             @functools.wraps(orig_func)
43             def wrapper(self, *args, **kwargs):
44                 start_time = time.time()
45                 try:
46                     orig_func(self, *args, **kwargs)
47                 except errors as error:
48                     self._logger.warning(
49                         "Client error: %s - waiting %s seconds",
50                         error, self.retry_wait)
51                     self._timer.schedule(start_time + self.retry_wait,
52                                          getattr(self._later,
53                                                  orig_func.__name__),
54                                          *args, **kwargs)
55                     self.retry_wait = min(self.retry_wait * 2,
56                                           self.max_retry_wait)
57                 else:
58                     self.retry_wait = self.min_retry_wait
59             return wrapper
60         return decorator
61
62     def _finished(self):
63         _notify_subscribers(self._later, self.subscribers)
64         self.subscribers = None
65
66     def subscribe(self, subscriber):
67         if self.subscribers is None:
68             try:
69                 subscriber(self._later)
70             except pykka.ActorDeadError:
71                 pass
72         else:
73             self.subscribers.add(subscriber)
74
75
76 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
77     """Actor to create and set up a cloud compute node.
78
79     This actor prepares an Arvados node record for a new compute node
80     (either creating one or cleaning one passed in), then boots the
81     actual compute node.  It notifies subscribers when the cloud node
82     is successfully created (the last step in the process for Node
83     Manager to handle).
84     """
85     def __init__(self, timer_actor, arvados_client, cloud_client,
86                  cloud_size, arvados_node=None,
87                  retry_wait=1, max_retry_wait=180):
88         super(ComputeNodeSetupActor, self).__init__(
89             'arvnodeman.nodeup', timer_actor, retry_wait, max_retry_wait)
90         self._arvados = arvados_client
91         self._cloud = cloud_client
92         self.cloud_size = cloud_size
93         self.arvados_node = None
94         self.cloud_node = None
95         if arvados_node is None:
96             self._later.create_arvados_node()
97         else:
98             self._later.prepare_arvados_node(arvados_node)
99
100     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
101     def create_arvados_node(self):
102         self.arvados_node = self._arvados.nodes().create(body={}).execute()
103         self._later.create_cloud_node()
104
105     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
106     def prepare_arvados_node(self, node):
107         self.arvados_node = self._arvados.nodes().update(
108             uuid=node['uuid'],
109             body={'hostname': None,
110                   'ip_address': None,
111                   'slot_number': None,
112                   'first_ping_at': None,
113                   'last_ping_at': None,
114                   'info': {'ec2_instance_id': None,
115                            'last_action': "Prepared by Node Manager"}}
116             ).execute()
117         self._later.create_cloud_node()
118
119     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
120     def create_cloud_node(self):
121         self._logger.info("Creating cloud node with size %s.",
122                           self.cloud_size.name)
123         self.cloud_node = self._cloud.create_node(self.cloud_size,
124                                                   self.arvados_node)
125         self._logger.info("Cloud node %s created.", self.cloud_node.id)
126         self._finished()
127
128     def stop_if_no_cloud_node(self):
129         if self.cloud_node is None:
130             self.stop()
131
132
133 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
134     """Actor to shut down a compute node.
135
136     This actor simply destroys a cloud node, retrying as needed.
137     """
138     def __init__(self, timer_actor, cloud_client, node_monitor,
139                  cancellable=True, retry_wait=1, max_retry_wait=180):
140         # If a ShutdownActor is cancellable, it will ask the
141         # ComputeNodeMonitorActor if it's still eligible before taking each
142         # action, and stop the shutdown process if the node is no longer
143         # eligible.  Normal shutdowns based on job demand should be
144         # cancellable; shutdowns based on node misbehavior should not.
145         super(ComputeNodeShutdownActor, self).__init__(
146             'arvnodeman.nodedown', timer_actor, retry_wait, max_retry_wait)
147         self._cloud = cloud_client
148         self._monitor = node_monitor.proxy()
149         self.cloud_node = self._monitor.cloud_node.get()
150         self.cancellable = cancellable
151         self.success = None
152
153     def on_start(self):
154         self._later.shutdown_node()
155
156     def cancel_shutdown(self):
157         self.success = False
158         self._finished()
159
160     def _stop_if_window_closed(orig_func):
161         @functools.wraps(orig_func)
162         def wrapper(self, *args, **kwargs):
163             if (self.cancellable and
164                   (not self._monitor.shutdown_eligible().get())):
165                 self._logger.info(
166                     "Cloud node %s shutdown cancelled - no longer eligible.",
167                     self.cloud_node.id)
168                 self._later.cancel_shutdown()
169                 return None
170             else:
171                 return orig_func(self, *args, **kwargs)
172         return wrapper
173
174     @_stop_if_window_closed
175     @ComputeNodeStateChangeBase._retry(config.CLOUD_ERRORS)
176     def shutdown_node(self):
177         if self._cloud.destroy_node(self.cloud_node):
178             self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
179             self.success = True
180             self._finished()
181         else:
182             # Force a retry.
183             raise cloud_types.LibcloudError("destroy_node failed")
184
185     # Make the decorator available to subclasses.
186     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
187
188
189 class ComputeNodeUpdateActor(config.actor_class):
190     """Actor to dispatch one-off cloud management requests.
191
192     This actor receives requests for small cloud updates, and
193     dispatches them to a real driver.  ComputeNodeMonitorActors use
194     this to perform maintenance tasks on themselves.  Having a
195     dedicated actor for this gives us the opportunity to control the
196     flow of requests; e.g., by backing off when errors occur.
197
198     This actor is most like a "traditional" Pykka actor: there's no
199     subscribing, but instead methods return real driver results.  If
200     you're interested in those results, you should get them from the
201     Future that the proxy method returns.  Be prepared to handle exceptions
202     from the cloud driver when you do.
203     """
204     def __init__(self, cloud_factory, max_retry_wait=180):
205         super(ComputeNodeUpdateActor, self).__init__()
206         self._cloud = cloud_factory()
207         self.max_retry_wait = max_retry_wait
208         self.error_streak = 0
209         self.next_request_time = time.time()
210
211     def _throttle_errors(orig_func):
212         @functools.wraps(orig_func)
213         def wrapper(self, *args, **kwargs):
214             throttle_time = self.next_request_time - time.time()
215             if throttle_time > 0:
216                 time.sleep(throttle_time)
217             self.next_request_time = time.time()
218             try:
219                 result = orig_func(self, *args, **kwargs)
220             except config.CLOUD_ERRORS:
221                 self.error_streak += 1
222                 self.next_request_time += min(2 ** self.error_streak,
223                                               self.max_retry_wait)
224                 raise
225             else:
226                 self.error_streak = 0
227                 return result
228         return wrapper
229
230     @_throttle_errors
231     def sync_node(self, cloud_node, arvados_node):
232         return self._cloud.sync_node(cloud_node, arvados_node)
233
234
235 class ComputeNodeMonitorActor(config.actor_class):
236     """Actor to manage a running compute node.
237
238     This actor gets updates about a compute node's cloud and Arvados records.
239     It uses this information to notify subscribers when the node is eligible
240     for shutdown.
241     """
242     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
243                  timer_actor, update_actor, arvados_node=None,
244                  poll_stale_after=600, node_stale_after=3600):
245         super(ComputeNodeMonitorActor, self).__init__()
246         self._later = self.actor_ref.proxy()
247         self._logger = logging.getLogger('arvnodeman.computenode')
248         self._last_log = None
249         self._shutdowns = shutdown_timer
250         self._timer = timer_actor
251         self._update = update_actor
252         self.cloud_node = cloud_node
253         self.cloud_node_start_time = cloud_node_start_time
254         self.poll_stale_after = poll_stale_after
255         self.node_stale_after = node_stale_after
256         self.subscribers = set()
257         self.arvados_node = None
258         self._later.update_arvados_node(arvados_node)
259         self.last_shutdown_opening = None
260         self._later.consider_shutdown()
261
262     def subscribe(self, subscriber):
263         self.subscribers.add(subscriber)
264
265     def _debug(self, msg, *args):
266         if msg == self._last_log:
267             return
268         self._last_log = msg
269         self._logger.debug(msg, *args)
270
271     def in_state(self, *states):
272         # Return a boolean to say whether or not our Arvados node record is in
273         # one of the given states.  If state information is not
274         # available--because this node has no Arvados record, the record is
275         # stale, or the record has no state information--return None.
276         if (self.arvados_node is None) or not timestamp_fresh(
277               arvados_node_mtime(self.arvados_node), self.node_stale_after):
278             return None
279         state = self.arvados_node['info'].get('slurm_state')
280         if not state:
281             return None
282         result = state in states
283         if state == 'idle':
284             result = result and not self.arvados_node['job_uuid']
285         return result
286
287     def shutdown_eligible(self):
288         if not self._shutdowns.window_open():
289             return False
290         elif self.arvados_node is None:
291             # If this is a new, unpaired node, it's eligible for
292             # shutdown--we figure there was an error during bootstrap.
293             return timestamp_fresh(self.cloud_node_start_time,
294                                    self.node_stale_after)
295         else:
296             return self.in_state('idle')
297
298     def consider_shutdown(self):
299         next_opening = self._shutdowns.next_opening()
300         if self.shutdown_eligible():
301             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
302             _notify_subscribers(self._later, self.subscribers)
303         elif self._shutdowns.window_open():
304             self._debug("Node %s shutdown window open but node busy.",
305                         self.cloud_node.id)
306         elif self.last_shutdown_opening != next_opening:
307             self._debug("Node %s shutdown window closed.  Next at %s.",
308                         self.cloud_node.id, time.ctime(next_opening))
309             self._timer.schedule(next_opening, self._later.consider_shutdown)
310             self.last_shutdown_opening = next_opening
311
312     def offer_arvados_pair(self, arvados_node):
313         if self.arvados_node is not None:
314             return None
315         elif arvados_node['ip_address'] in self.cloud_node.private_ips:
316             self._later.update_arvados_node(arvados_node)
317             return self.cloud_node.id
318         else:
319             return None
320
321     def update_cloud_node(self, cloud_node):
322         if cloud_node is not None:
323             self.cloud_node = cloud_node
324             self._later.consider_shutdown()
325
326     def update_arvados_node(self, arvados_node):
327         if arvados_node is not None:
328             self.arvados_node = arvados_node
329             new_hostname = arvados_node_fqdn(self.arvados_node)
330             if new_hostname != self.cloud_node.name:
331                 self._update.sync_node(self.cloud_node, self.arvados_node)
332             self._later.consider_shutdown()