Merge branch '7711-record-node-price' closes #7711
[arvados.git] / services / nodemanager / arvnodeman / computenode / dispatch / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import functools
6 import logging
7 import time
8
9 import libcloud.common.types as cloud_types
10 import pykka
11
12 from .. import \
13     arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
14 from ...clientactor import _notify_subscribers
15 from ... import config
16
17 class ComputeNodeStateChangeBase(config.actor_class):
18     """Base class for actors that change a compute node's state.
19
20     This base class takes care of retrying changes and notifying
21     subscribers when the change is finished.
22     """
23     def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
24                  retry_wait, max_retry_wait):
25         super(ComputeNodeStateChangeBase, self).__init__()
26         self._later = self.actor_ref.proxy()
27         self._logger = logging.getLogger(logger_name)
28         self._cloud = cloud_client
29         self._arvados = arvados_client
30         self._timer = timer_actor
31         self.min_retry_wait = retry_wait
32         self.max_retry_wait = max_retry_wait
33         self.retry_wait = retry_wait
34         self.subscribers = set()
35
36     @staticmethod
37     def _retry(errors=()):
38         """Retry decorator for an actor method that makes remote requests.
39
40         Use this function to decorator an actor method, and pass in a
41         tuple of exceptions to catch.  This decorator will schedule
42         retries of that method with exponential backoff if the
43         original method raises a known cloud driver error, or any of the
44         given exception types.
45         """
46         def decorator(orig_func):
47             @functools.wraps(orig_func)
48             def retry_wrapper(self, *args, **kwargs):
49                 start_time = time.time()
50                 try:
51                     orig_func(self, *args, **kwargs)
52                 except Exception as error:
53                     if not (isinstance(error, errors) or
54                             self._cloud.is_cloud_exception(error)):
55                         raise
56                     self._logger.warning(
57                         "Client error: %s - waiting %s seconds",
58                         error, self.retry_wait)
59                     self._timer.schedule(start_time + self.retry_wait,
60                                          getattr(self._later,
61                                                  orig_func.__name__),
62                                          *args, **kwargs)
63                     self.retry_wait = min(self.retry_wait * 2,
64                                           self.max_retry_wait)
65                 else:
66                     self.retry_wait = self.min_retry_wait
67             return retry_wrapper
68         return decorator
69
70     def _finished(self):
71         _notify_subscribers(self._later, self.subscribers)
72         self.subscribers = None
73
74     def subscribe(self, subscriber):
75         if self.subscribers is None:
76             try:
77                 subscriber(self._later)
78             except pykka.ActorDeadError:
79                 pass
80         else:
81             self.subscribers.add(subscriber)
82
83     def _clean_arvados_node(self, arvados_node, explanation):
84         return self._arvados.nodes().update(
85             uuid=arvados_node['uuid'],
86             body={'hostname': None,
87                   'ip_address': None,
88                   'slot_number': None,
89                   'first_ping_at': None,
90                   'last_ping_at': None,
91                   'properties': {},
92                   'info': {'ec2_instance_id': None,
93                            'last_action': explanation}},
94             ).execute()
95
96
97 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
98     """Actor to create and set up a cloud compute node.
99
100     This actor prepares an Arvados node record for a new compute node
101     (either creating one or cleaning one passed in), then boots the
102     actual compute node.  It notifies subscribers when the cloud node
103     is successfully created (the last step in the process for Node
104     Manager to handle).
105     """
106     def __init__(self, timer_actor, arvados_client, cloud_client,
107                  cloud_size, arvados_node=None,
108                  retry_wait=1, max_retry_wait=180):
109         super(ComputeNodeSetupActor, self).__init__(
110             'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
111             retry_wait, max_retry_wait)
112         self.cloud_size = cloud_size
113         self.arvados_node = None
114         self.cloud_node = None
115         if arvados_node is None:
116             self._later.create_arvados_node()
117         else:
118             self._later.prepare_arvados_node(arvados_node)
119
120     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
121     def create_arvados_node(self):
122         self.arvados_node = self._arvados.nodes().create(body={}).execute()
123         self._later.create_cloud_node()
124
125     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
126     def prepare_arvados_node(self, node):
127         self.arvados_node = self._clean_arvados_node(
128             node, "Prepared by Node Manager")
129         self._later.create_cloud_node()
130
131     @ComputeNodeStateChangeBase._retry()
132     def create_cloud_node(self):
133         self._logger.info("Creating cloud node with size %s.",
134                           self.cloud_size.name)
135         self.cloud_node = self._cloud.create_node(self.cloud_size,
136                                                   self.arvados_node)
137         self._logger.info("Cloud node %s created.", self.cloud_node.id)
138         self._later.update_arvados_node_properties()
139
140     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
141     def update_arvados_node_properties(self):
142         """Tell Arvados some details about the cloud node.
143
144         Currently we only include size/price from our request, which
145         we already knew before create_cloud_node(), but doing it here
146         gives us an opportunity to provide more detail from
147         self.cloud_node, too.
148         """
149         self.arvados_node['properties']['cloud_node'] = {
150             # Note this 'size' is the node size we asked the cloud
151             # driver to create -- not necessarily equal to the size
152             # reported by the cloud driver for the node that was
153             # created.
154             'size': self.cloud_size.id,
155             'price': self.cloud_size.price,
156         }
157         self.arvados_node = self._arvados.nodes().update(
158             uuid=self.arvados_node['uuid'],
159             body={'properties': self.arvados_node['properties']},
160         ).execute()
161         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
162         self._later.post_create()
163
164     @ComputeNodeStateChangeBase._retry()
165     def post_create(self):
166         self._cloud.post_create_node(self.cloud_node)
167         self._logger.info("%s post-create work done.", self.cloud_node.id)
168         self._finished()
169
170     def stop_if_no_cloud_node(self):
171         if self.cloud_node is not None:
172             return False
173         self.stop()
174         return True
175
176
177 class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
178     """Actor to shut down a compute node.
179
180     This actor simply destroys a cloud node, retrying as needed.
181     """
182     # Reasons for a shutdown to be cancelled.
183     WINDOW_CLOSED = "shutdown window closed"
184     NODE_BROKEN = "cloud failed to shut down broken node"
185
186     def __init__(self, timer_actor, cloud_client, arvados_client, node_monitor,
187                  cancellable=True, retry_wait=1, max_retry_wait=180):
188         # If a ShutdownActor is cancellable, it will ask the
189         # ComputeNodeMonitorActor if it's still eligible before taking each
190         # action, and stop the shutdown process if the node is no longer
191         # eligible.  Normal shutdowns based on job demand should be
192         # cancellable; shutdowns based on node misbehavior should not.
193         super(ComputeNodeShutdownActor, self).__init__(
194             'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
195             retry_wait, max_retry_wait)
196         self._monitor = node_monitor.proxy()
197         self.cloud_node = self._monitor.cloud_node.get()
198         self.cancellable = cancellable
199         self.cancel_reason = None
200         self.success = None
201
202     def on_start(self):
203         self._later.shutdown_node()
204
205     def _arvados_node(self):
206         return self._monitor.arvados_node.get()
207
208     def _finished(self, success_flag=None):
209         if success_flag is not None:
210             self.success = success_flag
211         return super(ComputeNodeShutdownActor, self)._finished()
212
213     def cancel_shutdown(self, reason):
214         self.cancel_reason = reason
215         self._logger.info("Cloud node %s shutdown cancelled: %s.",
216                           self.cloud_node.id, reason)
217         self._finished(success_flag=False)
218
219     def _stop_if_window_closed(orig_func):
220         @functools.wraps(orig_func)
221         def stop_wrapper(self, *args, **kwargs):
222             if (self.cancellable and
223                   (not self._monitor.shutdown_eligible().get())):
224                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
225                 return None
226             else:
227                 return orig_func(self, *args, **kwargs)
228         return stop_wrapper
229
230     @_stop_if_window_closed
231     @ComputeNodeStateChangeBase._retry()
232     def shutdown_node(self):
233         if not self._cloud.destroy_node(self.cloud_node):
234             if self._cloud.broken(self.cloud_node):
235                 self._later.cancel_shutdown(self.NODE_BROKEN)
236             else:
237                 # Force a retry.
238                 raise cloud_types.LibcloudError("destroy_node failed")
239         self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
240         arv_node = self._arvados_node()
241         if arv_node is None:
242             self._finished(success_flag=True)
243         else:
244             self._later.clean_arvados_node(arv_node)
245
246     @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
247     def clean_arvados_node(self, arvados_node):
248         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
249         self._finished(success_flag=True)
250
251     # Make the decorator available to subclasses.
252     _stop_if_window_closed = staticmethod(_stop_if_window_closed)
253
254
255 class ComputeNodeUpdateActor(config.actor_class):
256     """Actor to dispatch one-off cloud management requests.
257
258     This actor receives requests for small cloud updates, and
259     dispatches them to a real driver.  ComputeNodeMonitorActors use
260     this to perform maintenance tasks on themselves.  Having a
261     dedicated actor for this gives us the opportunity to control the
262     flow of requests; e.g., by backing off when errors occur.
263
264     This actor is most like a "traditional" Pykka actor: there's no
265     subscribing, but instead methods return real driver results.  If
266     you're interested in those results, you should get them from the
267     Future that the proxy method returns.  Be prepared to handle exceptions
268     from the cloud driver when you do.
269     """
270     def __init__(self, cloud_factory, max_retry_wait=180):
271         super(ComputeNodeUpdateActor, self).__init__()
272         self._cloud = cloud_factory()
273         self.max_retry_wait = max_retry_wait
274         self.error_streak = 0
275         self.next_request_time = time.time()
276
277     def _throttle_errors(orig_func):
278         @functools.wraps(orig_func)
279         def throttle_wrapper(self, *args, **kwargs):
280             throttle_time = self.next_request_time - time.time()
281             if throttle_time > 0:
282                 time.sleep(throttle_time)
283             self.next_request_time = time.time()
284             try:
285                 result = orig_func(self, *args, **kwargs)
286             except Exception as error:
287                 self.error_streak += 1
288                 self.next_request_time += min(2 ** self.error_streak,
289                                               self.max_retry_wait)
290                 raise
291             else:
292                 self.error_streak = 0
293                 return result
294         return throttle_wrapper
295
296     @_throttle_errors
297     def sync_node(self, cloud_node, arvados_node):
298         return self._cloud.sync_node(cloud_node, arvados_node)
299
300
301 class ComputeNodeMonitorActor(config.actor_class):
302     """Actor to manage a running compute node.
303
304     This actor gets updates about a compute node's cloud and Arvados records.
305     It uses this information to notify subscribers when the node is eligible
306     for shutdown.
307     """
308     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
309                  cloud_fqdn_func, timer_actor, update_actor, cloud_client,
310                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
311                  boot_fail_after=1800
312     ):
313         super(ComputeNodeMonitorActor, self).__init__()
314         self._later = self.actor_ref.proxy()
315         self._logger = logging.getLogger('arvnodeman.computenode')
316         self._last_log = None
317         self._shutdowns = shutdown_timer
318         self._cloud_node_fqdn = cloud_fqdn_func
319         self._timer = timer_actor
320         self._update = update_actor
321         self._cloud = cloud_client
322         self.cloud_node = cloud_node
323         self.cloud_node_start_time = cloud_node_start_time
324         self.poll_stale_after = poll_stale_after
325         self.node_stale_after = node_stale_after
326         self.boot_fail_after = boot_fail_after
327         self.subscribers = set()
328         self.arvados_node = None
329         self._later.update_arvados_node(arvados_node)
330         self.last_shutdown_opening = None
331         self._later.consider_shutdown()
332
333     def subscribe(self, subscriber):
334         self.subscribers.add(subscriber)
335
336     def _debug(self, msg, *args):
337         if msg == self._last_log:
338             return
339         self._last_log = msg
340         self._logger.debug(msg, *args)
341
342     def in_state(self, *states):
343         # Return a boolean to say whether or not our Arvados node record is in
344         # one of the given states.  If state information is not
345         # available--because this node has no Arvados record, the record is
346         # stale, or the record has no state information--return None.
347         if (self.arvados_node is None) or not timestamp_fresh(
348               arvados_node_mtime(self.arvados_node), self.node_stale_after):
349             return None
350         state = self.arvados_node['crunch_worker_state']
351         if not state:
352             return None
353         result = state in states
354         if state == 'idle':
355             result = result and not self.arvados_node['job_uuid']
356         return result
357
358     def shutdown_eligible(self):
359         if not self._shutdowns.window_open():
360             return False
361         if self.arvados_node is None:
362             # Node is unpaired.
363             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
364             return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
365         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
366         if missing and self._cloud.broken(self.cloud_node):
367             # Node is paired, but Arvados says it is missing and the cloud says the node
368             # is in an error state, so shut it down.
369             return True
370         if missing is None and self._cloud.broken(self.cloud_node):
371             self._logger.warning(
372                 "cloud reports broken node, but paired node %s never pinged "
373                 "(bug?) -- skipped check for node_stale_after",
374                 self.arvados_node['uuid'])
375         return self.in_state('idle')
376
377     def consider_shutdown(self):
378         next_opening = self._shutdowns.next_opening()
379         if self.shutdown_eligible():
380             self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
381             _notify_subscribers(self._later, self.subscribers)
382         elif self._shutdowns.window_open():
383             self._debug("Node %s shutdown window open but node busy.",
384                         self.cloud_node.id)
385         elif self.last_shutdown_opening != next_opening:
386             self._debug("Node %s shutdown window closed.  Next at %s.",
387                         self.cloud_node.id, time.ctime(next_opening))
388             self._timer.schedule(next_opening, self._later.consider_shutdown)
389             self.last_shutdown_opening = next_opening
390
391     def offer_arvados_pair(self, arvados_node):
392         first_ping_s = arvados_node.get('first_ping_at')
393         if (self.arvados_node is not None) or (not first_ping_s):
394             return None
395         elif ((arvados_node['ip_address'] in self.cloud_node.private_ips) and
396               (arvados_timestamp(first_ping_s) >= self.cloud_node_start_time)):
397             self._later.update_arvados_node(arvados_node)
398             return self.cloud_node.id
399         else:
400             return None
401
402     def update_cloud_node(self, cloud_node):
403         if cloud_node is not None:
404             self.cloud_node = cloud_node
405             self._later.consider_shutdown()
406
407     def update_arvados_node(self, arvados_node):
408         # If the cloud node's FQDN doesn't match what's in the Arvados node
409         # record, make them match.
410         # This method is a little unusual in the way it just fires off the
411         # request without checking the result or retrying errors.  That's
412         # because this update happens every time we reload the Arvados node
413         # list: if a previous sync attempt failed, we'll see that the names
414         # are out of sync and just try again.  ComputeNodeUpdateActor has
415         # the logic to throttle those effective retries when there's trouble.
416         if arvados_node is not None:
417             self.arvados_node = arvados_node
418             if (self._cloud_node_fqdn(self.cloud_node) !=
419                   arvados_node_fqdn(self.arvados_node)):
420                 self._update.sync_node(self.cloud_node, self.arvados_node)
421             self._later.consider_shutdown()