Merge branch '12446-dispatcher-query' closes #12446
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 from __future__ import absolute_import, print_function
7
8 import calendar
9 import functools
10 import itertools
11 import re
12 import time
13
14 from ..config import CLOUD_ERRORS
15 from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
16
17 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
18 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
19
20 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
21     hostname = arvados_node.get('hostname') or default_hostname
22     return '{}.{}'.format(hostname, arvados_node['domain'])
23
24 def arvados_node_mtime(node):
25     return arvados_timestamp(node['modified_at'])
26
27 def arvados_timestamp(timestr):
28     subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
29     if subsec_match is None:
30         subsecs = .0
31     else:
32         subsecs = float(subsec_match.group(1))
33         timestr = timestr[:subsec_match.start()] + 'Z'
34     return calendar.timegm(time.strptime(timestr + 'UTC',
35                                          ARVADOS_TIMEFMT + '%Z'))
36
37 def timestamp_fresh(timestamp, fresh_time):
38     return (time.time() - timestamp) < fresh_time
39
40 def arvados_node_missing(arvados_node, fresh_time):
41     """Indicate if cloud node corresponding to the arvados
42     node is "missing".
43
44     If True, this means the node has not pinged the API server within the timeout
45     period.  If False, the ping is up to date.  If the node has never pinged,
46     returns None.
47     """
48     if arvados_node["last_ping_at"] is None:
49         return None
50     else:
51         return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
52
53 class RetryMixin(object):
54     """Retry decorator for an method that makes remote requests.
55
56     Use this function to decorate method, and pass in a tuple of exceptions to
57     catch.  If the original method raises a known cloud driver error, or any of
58     the given exception types, this decorator will either go into a
59     sleep-and-retry loop with exponential backoff either by sleeping (if
60     self._timer is None) or by scheduling retries of the method (if self._timer
61     is a timer actor.)
62
63     """
64     def __init__(self, retry_wait, max_retry_wait, logger, cloud, timer=None):
65         self.min_retry_wait = max(1, retry_wait)
66         self.max_retry_wait = max(self.min_retry_wait, max_retry_wait)
67         self.retry_wait = retry_wait
68         self._logger = logger
69         self._cloud = cloud
70         self._timer = timer
71
72     @staticmethod
73     def _retry(errors=()):
74         def decorator(orig_func):
75             @functools.wraps(orig_func)
76             def retry_wrapper(self, *args, **kwargs):
77                 while True:
78                     should_retry = False
79                     try:
80                         ret = orig_func(self, *args, **kwargs)
81                     except RateLimitReachedError as error:
82                         # If retry-after is zero, continue with exponential
83                         # backoff.
84                         if error.retry_after != 0:
85                             self.retry_wait = error.retry_after
86                         should_retry = True
87                     except BaseHTTPError as error:
88                         if error.headers and error.headers.get("retry-after"):
89                             try:
90                                 retry_after = int(error.headers["retry-after"])
91                                 # If retry-after is zero, continue with
92                                 # exponential backoff.
93                                 if retry_after != 0:
94                                     self.retry_wait = retry_after
95                                 should_retry = True
96                             except ValueError:
97                                 self._logger.warning(
98                                     "Unrecognizable Retry-After header: %r",
99                                     error.headers["retry-after"],
100                                     exc_info=error)
101                         if error.code == 429 or error.code >= 500:
102                             should_retry = True
103                     except CLOUD_ERRORS as error:
104                         should_retry = True
105                     except errors as error:
106                         should_retry = True
107                     except Exception as error:
108                         # As a libcloud workaround for drivers that don't use
109                         # typed exceptions, consider bare Exception() objects
110                         # retryable.
111                         should_retry = type(error) is Exception
112                     else:
113                         # No exception,
114                         self.retry_wait = self.min_retry_wait
115                         return ret
116
117                     # Only got here if an exception was caught.  Now determine what to do about it.
118                     if not should_retry:
119                         self.retry_wait = self.min_retry_wait
120                         self._logger.warning(
121                             "Re-raising error (no retry): %s",
122                             error, exc_info=error)
123                         raise
124
125                     # Retry wait out of bounds?
126                     if self.retry_wait < self.min_retry_wait:
127                         self.retry_wait = self.min_retry_wait
128                     elif self.retry_wait > self.max_retry_wait:
129                         self.retry_wait = self.max_retry_wait
130
131                     self._logger.warning(
132                         "Client error: %s - %s %s seconds",
133                         error,
134                         "scheduling retry in" if self._timer else "sleeping",
135                         self.retry_wait,
136                         exc_info=error)
137
138                     if self._timer:
139                         start_time = time.time()
140                         # reschedule to be called again
141                         self._timer.schedule(start_time + self.retry_wait,
142                                              getattr(self._later,
143                                                      orig_func.__name__),
144                                              *args, **kwargs)
145                     else:
146                         # sleep on it.
147                         time.sleep(self.retry_wait)
148
149                     self.retry_wait = min(self.retry_wait * 2,
150                                           self.max_retry_wait)
151                     if self._timer:
152                         # expect to be called again by timer so don't loop
153                         return
154
155             return retry_wrapper
156         return decorator
157
158 class ShutdownTimer(object):
159     """Keep track of a cloud node's shutdown windows.
160
161     Instantiate this class with a timestamp of when a cloud node started,
162     and a list of durations (in minutes) of when the node must not and may
163     be shut down, alternating.  The class will tell you when a shutdown
164     window is open, and when the next open window will start.
165     """
166     def __init__(self, start_time, shutdown_windows):
167         # The implementation is easiest if we have an even number of windows,
168         # because then windows always alternate between open and closed.
169         # Rig that up: calculate the first shutdown window based on what's
170         # passed in.  Then, if we were given an odd number of windows, merge
171         # that first window into the last one, since they both# represent
172         # closed state.
173         first_window = shutdown_windows[0]
174         shutdown_windows = list(shutdown_windows[1:])
175         self._next_opening = start_time + (60 * first_window)
176         if len(shutdown_windows) % 2:
177             shutdown_windows.append(first_window)
178         else:
179             shutdown_windows[-1] += first_window
180         self.shutdown_windows = itertools.cycle([60 * n
181                                                  for n in shutdown_windows])
182         self._open_start = self._next_opening
183         self._open_for = next(self.shutdown_windows)
184
185     def _advance_opening(self):
186         while self._next_opening < time.time():
187             self._open_start = self._next_opening
188             self._next_opening += self._open_for + next(self.shutdown_windows)
189             self._open_for = next(self.shutdown_windows)
190
191     def next_opening(self):
192         self._advance_opening()
193         return self._next_opening
194
195     def window_open(self):
196         self._advance_opening()
197         return 0 < (time.time() - self._open_start) < self._open_for