Merge branch '8784-dir-listings'
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import calendar
9 import functools
10 import itertools
11 import re
12 import time
13
14 from ..config import CLOUD_ERRORS
15 from libcloud.common.exceptions import BaseHTTPError
16
17 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
18 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
19
20 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
21     hostname = arvados_node.get('hostname') or default_hostname
22     return '{}.{}'.format(hostname, arvados_node['domain'])
23
24 def arvados_node_mtime(node):
25     return arvados_timestamp(node['modified_at'])
26
27 def arvados_timestamp(timestr):
28     subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
29     if subsec_match is None:
30         subsecs = .0
31     else:
32         subsecs = float(subsec_match.group(1))
33         timestr = timestr[:subsec_match.start()] + 'Z'
34     return calendar.timegm(time.strptime(timestr + 'UTC',
35                                          ARVADOS_TIMEFMT + '%Z'))
36
37 def timestamp_fresh(timestamp, fresh_time):
38     return (time.time() - timestamp) < fresh_time
39
40 def arvados_node_missing(arvados_node, fresh_time):
41     """Indicate if cloud node corresponding to the arvados
42     node is "missing".
43
44     If True, this means the node has not pinged the API server within the timeout
45     period.  If False, the ping is up to date.  If the node has never pinged,
46     returns None.
47     """
48     if arvados_node["last_ping_at"] is None:
49         return None
50     else:
51         return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
52
53 class RetryMixin(object):
54     """Retry decorator for an method that makes remote requests.
55
56     Use this function to decorate method, and pass in a tuple of exceptions to
57     catch.  If the original method raises a known cloud driver error, or any of
58     the given exception types, this decorator will either go into a
59     sleep-and-retry loop with exponential backoff either by sleeping (if
60     self._timer is None) or by scheduling retries of the method (if self._timer
61     is a timer actor.)
62
63     """
64     def __init__(self, retry_wait, max_retry_wait,
65                  logger, cloud, timer=None):
66         self.min_retry_wait = retry_wait
67         self.max_retry_wait = max_retry_wait
68         self.retry_wait = retry_wait
69         self._logger = logger
70         self._cloud = cloud
71         self._timer = timer
72
73     @staticmethod
74     def _retry(errors=()):
75         def decorator(orig_func):
76             @functools.wraps(orig_func)
77             def retry_wrapper(self, *args, **kwargs):
78                 while True:
79                     should_retry = False
80                     try:
81                         ret = orig_func(self, *args, **kwargs)
82                     except BaseHTTPError as error:
83                         if error.headers and error.headers.get("retry-after"):
84                             try:
85                                 self.retry_wait = int(error.headers["retry-after"])
86                                 if self.retry_wait < 0 or self.retry_wait > self.max_retry_wait:
87                                     self.retry_wait = self.max_retry_wait
88                                 should_retry = True
89                             except ValueError:
90                                 pass
91                         if error.code == 429 or error.code >= 500:
92                             should_retry = True
93                     except CLOUD_ERRORS as error:
94                         should_retry = True
95                     except errors as error:
96                         should_retry = True
97                     except Exception as error:
98                         # As a libcloud workaround for drivers that don't use
99                         # typed exceptions, consider bare Exception() objects
100                         # retryable.
101                         should_retry = type(error) is Exception
102                     else:
103                         # No exception,
104                         self.retry_wait = self.min_retry_wait
105                         return ret
106
107                     # Only got here if an exception was caught.  Now determine what to do about it.
108                     if not should_retry:
109                         self.retry_wait = self.min_retry_wait
110                         self._logger.warning(
111                             "Re-raising error (no retry): %s",
112                             error, exc_info=error)
113                         raise
114
115                     self._logger.warning(
116                         "Client error: %s - %s %s seconds",
117                         error,
118                         "scheduling retry in" if self._timer else "sleeping",
119                         self.retry_wait,
120                         exc_info=error)
121
122                     if self._timer:
123                         start_time = time.time()
124                         # reschedule to be called again
125                         self._timer.schedule(start_time + self.retry_wait,
126                                              getattr(self._later,
127                                                      orig_func.__name__),
128                                              *args, **kwargs)
129                     else:
130                         # sleep on it.
131                         time.sleep(self.retry_wait)
132
133                     self.retry_wait = min(self.retry_wait * 2,
134                                           self.max_retry_wait)
135                     if self._timer:
136                         # expect to be called again by timer so don't loop
137                         return
138
139             return retry_wrapper
140         return decorator
141
142 class ShutdownTimer(object):
143     """Keep track of a cloud node's shutdown windows.
144
145     Instantiate this class with a timestamp of when a cloud node started,
146     and a list of durations (in minutes) of when the node must not and may
147     be shut down, alternating.  The class will tell you when a shutdown
148     window is open, and when the next open window will start.
149     """
150     def __init__(self, start_time, shutdown_windows):
151         # The implementation is easiest if we have an even number of windows,
152         # because then windows always alternate between open and closed.
153         # Rig that up: calculate the first shutdown window based on what's
154         # passed in.  Then, if we were given an odd number of windows, merge
155         # that first window into the last one, since they both# represent
156         # closed state.
157         first_window = shutdown_windows[0]
158         shutdown_windows = list(shutdown_windows[1:])
159         self._next_opening = start_time + (60 * first_window)
160         if len(shutdown_windows) % 2:
161             shutdown_windows.append(first_window)
162         else:
163             shutdown_windows[-1] += first_window
164         self.shutdown_windows = itertools.cycle([60 * n
165                                                  for n in shutdown_windows])
166         self._open_start = self._next_opening
167         self._open_for = next(self.shutdown_windows)
168
169     def _advance_opening(self):
170         while self._next_opening < time.time():
171             self._open_start = self._next_opening
172             self._next_opening += self._open_for + next(self.shutdown_windows)
173             self._open_for = next(self.shutdown_windows)
174
175     def next_opening(self):
176         self._advance_opening()
177         return self._next_opening
178
179     def window_open(self):
180         self._advance_opening()
181         return 0 < (time.time() - self._open_start) < self._open_for