15964: Remove qr1hi from a few more places. Delete unused includes.
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import calendar
9 import functools
10 import itertools
11 import re
12 import time
13
14 from ..config import CLOUD_ERRORS
15 from ..status import tracker
16 from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
17
18 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
19 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
20
21 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
22     hostname = arvados_node.get('hostname') or default_hostname
23     return '{}.{}'.format(hostname, arvados_node['domain'])
24
25 def arvados_node_mtime(node):
26     return arvados_timestamp(node['modified_at'])
27
28 def arvados_timestamp(timestr):
29     subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
30     if subsec_match is None:
31         subsecs = .0
32     else:
33         subsecs = float(subsec_match.group(1))
34         timestr = timestr[:subsec_match.start()] + 'Z'
35     return calendar.timegm(time.strptime(timestr + 'UTC',
36                                          ARVADOS_TIMEFMT + '%Z')) + subsecs
37
38 def timestamp_fresh(timestamp, fresh_time):
39     return (time.time() - timestamp) < fresh_time
40
41 def arvados_node_missing(arvados_node, fresh_time):
42     """Indicate if cloud node corresponding to the arvados
43     node is "missing".
44
45     If True, this means the node has not pinged the API server within the timeout
46     period.  If False, the ping is up to date.  If the node has never pinged,
47     returns None.
48     """
49     if arvados_node["last_ping_at"] is None:
50         return None
51     else:
52         return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
53
54 class RetryMixin(object):
55     """Retry decorator for an method that makes remote requests.
56
57     Use this function to decorate method, and pass in a tuple of exceptions to
58     catch.  If the original method raises a known cloud driver error, or any of
59     the given exception types, this decorator will either go into a
60     sleep-and-retry loop with exponential backoff either by sleeping (if
61     self._timer is None) or by scheduling retries of the method (if self._timer
62     is a timer actor.)
63
64     """
65     def __init__(self, retry_wait, max_retry_wait, logger, cloud, timer=None):
66         self.min_retry_wait = max(1, retry_wait)
67         self.max_retry_wait = max(self.min_retry_wait, max_retry_wait)
68         self.retry_wait = retry_wait
69         self._logger = logger
70         self._cloud = cloud
71         self._timer = timer
72
73     @staticmethod
74     def _retry(errors=()):
75         def decorator(orig_func):
76             @functools.wraps(orig_func)
77             def retry_wrapper(self, *args, **kwargs):
78                 while True:
79                     should_retry = False
80                     try:
81                         ret = orig_func(self, *args, **kwargs)
82                     except RateLimitReachedError as error:
83                         # If retry-after is zero, continue with exponential
84                         # backoff.
85                         if error.retry_after != 0:
86                             self.retry_wait = error.retry_after
87                         should_retry = True
88                     except BaseHTTPError as error:
89                         if error.headers and error.headers.get("retry-after"):
90                             try:
91                                 retry_after = int(error.headers["retry-after"])
92                                 # If retry-after is zero, continue with
93                                 # exponential backoff.
94                                 if retry_after != 0:
95                                     self.retry_wait = retry_after
96                                 should_retry = True
97                             except ValueError:
98                                 self._logger.warning(
99                                     "Unrecognizable Retry-After header: %r",
100                                     error.headers["retry-after"],
101                                     exc_info=error)
102                         if error.code == 429 or error.code >= 500:
103                             should_retry = True
104                     except CLOUD_ERRORS as error:
105                         tracker.counter_add('cloud_errors')
106                         should_retry = True
107                     except errors as error:
108                         should_retry = True
109                     except Exception as error:
110                         # As a libcloud workaround for drivers that don't use
111                         # typed exceptions, consider bare Exception() objects
112                         # retryable.
113                         if type(error) is Exception:
114                             tracker.counter_add('cloud_errors')
115                             should_retry = True
116                     else:
117                         # No exception
118                         self.retry_wait = self.min_retry_wait
119                         return ret
120
121                     # Only got here if an exception was caught.  Now determine what to do about it.
122                     if not should_retry:
123                         self.retry_wait = self.min_retry_wait
124                         self._logger.warning(
125                             "Re-raising error (no retry): %s",
126                             error, exc_info=error)
127                         raise
128
129                     # Retry wait out of bounds?
130                     if self.retry_wait < self.min_retry_wait:
131                         self.retry_wait = self.min_retry_wait
132                     elif self.retry_wait > self.max_retry_wait:
133                         self.retry_wait = self.max_retry_wait
134
135                     self._logger.warning(
136                         "Client error: %s - %s %s seconds",
137                         error,
138                         "scheduling retry in" if self._timer else "sleeping",
139                         self.retry_wait,
140                         exc_info=error)
141
142                     if self._timer:
143                         start_time = time.time()
144                         # reschedule to be called again
145                         self._timer.schedule(start_time + self.retry_wait,
146                                              getattr(self._later,
147                                                      orig_func.__name__),
148                                              *args, **kwargs)
149                     else:
150                         # sleep on it.
151                         time.sleep(self.retry_wait)
152
153                     self.retry_wait = min(self.retry_wait * 2,
154                                           self.max_retry_wait)
155                     if self._timer:
156                         # expect to be called again by timer so don't loop
157                         return
158
159             return retry_wrapper
160         return decorator
161
162 class ShutdownTimer(object):
163     """Keep track of a cloud node's shutdown windows.
164
165     Instantiate this class with a timestamp of when a cloud node started,
166     and a list of durations (in minutes) of when the node must not and may
167     be shut down, alternating.  The class will tell you when a shutdown
168     window is open, and when the next open window will start.
169     """
170     def __init__(self, start_time, shutdown_windows):
171         # The implementation is easiest if we have an even number of windows,
172         # because then windows always alternate between open and closed.
173         # Rig that up: calculate the first shutdown window based on what's
174         # passed in.  Then, if we were given an odd number of windows, merge
175         # that first window into the last one, since they both# represent
176         # closed state.
177         first_window = shutdown_windows[0]
178         shutdown_windows = list(shutdown_windows[1:])
179         self._next_opening = start_time + (60 * first_window)
180         if len(shutdown_windows) % 2:
181             shutdown_windows.append(first_window)
182         else:
183             shutdown_windows[-1] += first_window
184         self.shutdown_windows = itertools.cycle([60 * n
185                                                  for n in shutdown_windows])
186         self._open_start = self._next_opening
187         self._open_for = next(self.shutdown_windows)
188
189     def _advance_opening(self):
190         while self._next_opening < time.time():
191             self._open_start = self._next_opening
192             self._next_opening += self._open_for + next(self.shutdown_windows)
193             self._open_for = next(self.shutdown_windows)
194
195     def next_opening(self):
196         self._advance_opening()
197         return self._next_opening
198
199     def window_open(self):
200         self._advance_opening()
201         return 0 < (time.time() - self._open_start) < self._open_for