2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 from __future__ import absolute_import, print_function
14 from ..config import CLOUD_ERRORS
15 from ..status import tracker
16 from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
18 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
19 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
21 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
22 hostname = arvados_node.get('hostname') or default_hostname
23 return '{}.{}'.format(hostname, arvados_node['domain'])
25 def arvados_node_mtime(node):
26 return arvados_timestamp(node['modified_at'])
28 def arvados_timestamp(timestr):
29 subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
30 if subsec_match is None:
33 subsecs = float(subsec_match.group(1))
34 timestr = timestr[:subsec_match.start()] + 'Z'
35 return calendar.timegm(time.strptime(timestr + 'UTC',
36 ARVADOS_TIMEFMT + '%Z')) + subsecs
38 def timestamp_fresh(timestamp, fresh_time):
39 return (time.time() - timestamp) < fresh_time
41 def arvados_node_missing(arvados_node, fresh_time):
42 """Indicate if cloud node corresponding to the arvados
45 If True, this means the node has not pinged the API server within the timeout
46 period. If False, the ping is up to date. If the node has never pinged,
49 if arvados_node["last_ping_at"] is None:
52 return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
54 class RetryMixin(object):
55 """Retry decorator for an method that makes remote requests.
57 Use this function to decorate method, and pass in a tuple of exceptions to
58 catch. If the original method raises a known cloud driver error, or any of
59 the given exception types, this decorator will either go into a
60 sleep-and-retry loop with exponential backoff either by sleeping (if
61 self._timer is None) or by scheduling retries of the method (if self._timer
65 def __init__(self, retry_wait, max_retry_wait, logger, cloud, timer=None):
66 self.min_retry_wait = max(1, retry_wait)
67 self.max_retry_wait = max(self.min_retry_wait, max_retry_wait)
68 self.retry_wait = retry_wait
74 def _retry(errors=()):
75 def decorator(orig_func):
76 @functools.wraps(orig_func)
77 def retry_wrapper(self, *args, **kwargs):
81 ret = orig_func(self, *args, **kwargs)
82 except RateLimitReachedError as error:
83 # If retry-after is zero, continue with exponential
85 if error.retry_after != 0:
86 self.retry_wait = error.retry_after
88 except BaseHTTPError as error:
89 if error.headers and error.headers.get("retry-after"):
91 retry_after = int(error.headers["retry-after"])
92 # If retry-after is zero, continue with
93 # exponential backoff.
95 self.retry_wait = retry_after
99 "Unrecognizable Retry-After header: %r",
100 error.headers["retry-after"],
102 if error.code == 429 or error.code >= 500:
104 except CLOUD_ERRORS as error:
105 tracker.counter_add('cloud_errors')
107 except errors as error:
109 except Exception as error:
110 # As a libcloud workaround for drivers that don't use
111 # typed exceptions, consider bare Exception() objects
113 if type(error) is Exception:
114 tracker.counter_add('cloud_errors')
118 self.retry_wait = self.min_retry_wait
121 # Only got here if an exception was caught. Now determine what to do about it.
123 self.retry_wait = self.min_retry_wait
124 self._logger.warning(
125 "Re-raising error (no retry): %s",
126 error, exc_info=error)
129 # Retry wait out of bounds?
130 if self.retry_wait < self.min_retry_wait:
131 self.retry_wait = self.min_retry_wait
132 elif self.retry_wait > self.max_retry_wait:
133 self.retry_wait = self.max_retry_wait
135 self._logger.warning(
136 "Client error: %s - %s %s seconds",
138 "scheduling retry in" if self._timer else "sleeping",
143 start_time = time.time()
144 # reschedule to be called again
145 self._timer.schedule(start_time + self.retry_wait,
151 time.sleep(self.retry_wait)
153 self.retry_wait = min(self.retry_wait * 2,
156 # expect to be called again by timer so don't loop
162 class ShutdownTimer(object):
163 """Keep track of a cloud node's shutdown windows.
165 Instantiate this class with a timestamp of when a cloud node started,
166 and a list of durations (in minutes) of when the node must not and may
167 be shut down, alternating. The class will tell you when a shutdown
168 window is open, and when the next open window will start.
170 def __init__(self, start_time, shutdown_windows):
171 # The implementation is easiest if we have an even number of windows,
172 # because then windows always alternate between open and closed.
173 # Rig that up: calculate the first shutdown window based on what's
174 # passed in. Then, if we were given an odd number of windows, merge
175 # that first window into the last one, since they both# represent
177 first_window = shutdown_windows[0]
178 shutdown_windows = list(shutdown_windows[1:])
179 self._next_opening = start_time + (60 * first_window)
180 if len(shutdown_windows) % 2:
181 shutdown_windows.append(first_window)
183 shutdown_windows[-1] += first_window
184 self.shutdown_windows = itertools.cycle([60 * n
185 for n in shutdown_windows])
186 self._open_start = self._next_opening
187 self._open_for = next(self.shutdown_windows)
189 def _advance_opening(self):
190 while self._next_opening < time.time():
191 self._open_start = self._next_opening
192 self._next_opening += self._open_for + next(self.shutdown_windows)
193 self._open_for = next(self.shutdown_windows)
195 def next_opening(self):
196 self._advance_opening()
197 return self._next_opening
199 def window_open(self):
200 self._advance_opening()
201 return 0 < (time.time() - self._open_start) < self._open_for