2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
14 from ..config import CLOUD_ERRORS
15 from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
17 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
18 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
20 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
21 hostname = arvados_node.get('hostname') or default_hostname
22 return '{}.{}'.format(hostname, arvados_node['domain'])
24 def arvados_node_mtime(node):
25 return arvados_timestamp(node['modified_at'])
27 def arvados_timestamp(timestr):
28 subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
29 if subsec_match is None:
32 subsecs = float(subsec_match.group(1))
33 timestr = timestr[:subsec_match.start()] + 'Z'
34 return calendar.timegm(time.strptime(timestr + 'UTC',
35 ARVADOS_TIMEFMT + '%Z'))
37 def timestamp_fresh(timestamp, fresh_time):
38 return (time.time() - timestamp) < fresh_time
40 def arvados_node_missing(arvados_node, fresh_time):
41 """Indicate if cloud node corresponding to the arvados
44 If True, this means the node has not pinged the API server within the timeout
45 period. If False, the ping is up to date. If the node has never pinged,
48 if arvados_node["last_ping_at"] is None:
51 return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
53 class RetryMixin(object):
54 """Retry decorator for an method that makes remote requests.
56 Use this function to decorate method, and pass in a tuple of exceptions to
57 catch. If the original method raises a known cloud driver error, or any of
58 the given exception types, this decorator will either go into a
59 sleep-and-retry loop with exponential backoff either by sleeping (if
60 self._timer is None) or by scheduling retries of the method (if self._timer
64 def __init__(self, retry_wait, max_retry_wait, logger, cloud, timer=None):
65 self.min_retry_wait = max(1, retry_wait)
66 self.max_retry_wait = max(self.min_retry_wait, max_retry_wait)
67 self.retry_wait = retry_wait
73 def _retry(errors=()):
74 def decorator(orig_func):
75 @functools.wraps(orig_func)
76 def retry_wrapper(self, *args, **kwargs):
80 ret = orig_func(self, *args, **kwargs)
81 except RateLimitReachedError as error:
82 # If retry-after is zero, continue with exponential
84 if error.retry_after != 0:
85 self.retry_wait = error.retry_after
87 except BaseHTTPError as error:
88 if error.headers and error.headers.get("retry-after"):
90 retry_after = int(error.headers["retry-after"])
91 # If retry-after is zero, continue with
92 # exponential backoff.
94 self.retry_wait = retry_after
98 "Unrecognizable Retry-After header: %r",
99 error.headers["retry-after"],
101 if error.code == 429 or error.code >= 500:
103 except CLOUD_ERRORS as error:
105 except errors as error:
107 except Exception as error:
108 # As a libcloud workaround for drivers that don't use
109 # typed exceptions, consider bare Exception() objects
111 should_retry = type(error) is Exception
114 self.retry_wait = self.min_retry_wait
117 # Only got here if an exception was caught. Now determine what to do about it.
119 self.retry_wait = self.min_retry_wait
120 self._logger.warning(
121 "Re-raising error (no retry): %s",
122 error, exc_info=error)
125 # Retry wait out of bounds?
126 if self.retry_wait < self.min_retry_wait:
127 self.retry_wait = self.min_retry_wait
128 elif self.retry_wait > self.max_retry_wait:
129 self.retry_wait = self.max_retry_wait
131 self._logger.warning(
132 "Client error: %s - %s %s seconds",
134 "scheduling retry in" if self._timer else "sleeping",
139 start_time = time.time()
140 # reschedule to be called again
141 self._timer.schedule(start_time + self.retry_wait,
147 time.sleep(self.retry_wait)
149 self.retry_wait = min(self.retry_wait * 2,
152 # expect to be called again by timer so don't loop
158 class ShutdownTimer(object):
159 """Keep track of a cloud node's shutdown windows.
161 Instantiate this class with a timestamp of when a cloud node started,
162 and a list of durations (in minutes) of when the node must not and may
163 be shut down, alternating. The class will tell you when a shutdown
164 window is open, and when the next open window will start.
166 def __init__(self, start_time, shutdown_windows):
167 # The implementation is easiest if we have an even number of windows,
168 # because then windows always alternate between open and closed.
169 # Rig that up: calculate the first shutdown window based on what's
170 # passed in. Then, if we were given an odd number of windows, merge
171 # that first window into the last one, since they both# represent
173 first_window = shutdown_windows[0]
174 shutdown_windows = list(shutdown_windows[1:])
175 self._next_opening = start_time + (60 * first_window)
176 if len(shutdown_windows) % 2:
177 shutdown_windows.append(first_window)
179 shutdown_windows[-1] += first_window
180 self.shutdown_windows = itertools.cycle([60 * n
181 for n in shutdown_windows])
182 self._open_start = self._next_opening
183 self._open_for = next(self.shutdown_windows)
185 def _advance_opening(self):
186 while self._next_opening < time.time():
187 self._open_start = self._next_opening
188 self._next_opening += self._open_for + next(self.shutdown_windows)
189 self._open_for = next(self.shutdown_windows)
191 def next_opening(self):
192 self._advance_opening()
193 return self._next_opening
195 def window_open(self):
196 self._advance_opening()
197 return 0 < (time.time() - self._open_start) < self._open_for