8206: Refactor _retry into common function wrapper usable by both dispatch and
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import calendar
6 import functools
7 import itertools
8 import re
9 import time
10
11 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
12 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
13
14 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
15     hostname = arvados_node.get('hostname') or default_hostname
16     return '{}.{}'.format(hostname, arvados_node['domain'])
17
18 def arvados_node_mtime(node):
19     return arvados_timestamp(node['modified_at'])
20
21 def arvados_timestamp(timestr):
22     subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
23     if subsec_match is None:
24         subsecs = .0
25     else:
26         subsecs = float(subsec_match.group(1))
27         timestr = timestr[:subsec_match.start()] + 'Z'
28     return calendar.timegm(time.strptime(timestr + 'UTC',
29                                          ARVADOS_TIMEFMT + '%Z'))
30
31 def timestamp_fresh(timestamp, fresh_time):
32     return (time.time() - timestamp) < fresh_time
33
34 def arvados_node_missing(arvados_node, fresh_time):
35     """Indicate if cloud node corresponding to the arvados
36     node is "missing".
37
38     If True, this means the node has not pinged the API server within the timeout
39     period.  If False, the ping is up to date.  If the node has never pinged,
40     returns None.
41     """
42     if arvados_node["last_ping_at"] is None:
43         return None
44     else:
45         return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
46
47 def _retry(errors=()):
48     """Retry decorator for an actor method that makes remote requests.
49
50     Use this function to decorator an actor method, and pass in a
51     tuple of exceptions to catch.  This decorator will schedule
52     retries of that method with exponential backoff if the
53     original method raises a known cloud driver error, or any of the
54     given exception types.
55     """
56     def decorator(orig_func):
57         @functools.wraps(orig_func)
58         def retry_wrapper(self, *args, **kwargs):
59             start_time = time.time()
60             try:
61                 return orig_func(self, *args, **kwargs)
62             except Exception as error:
63                 if not (isinstance(error, errors) or
64                         self._cloud.is_cloud_exception(error)):
65                     raise
66                 self._logger.warning(
67                     "Client error: %s - waiting %s seconds",
68                     error, self.retry_wait)
69                 self._timer.schedule(start_time + self.retry_wait,
70                                      getattr(self._later,
71                                              orig_func.__name__),
72                                      *args, **kwargs)
73                 self.retry_wait = min(self.retry_wait * 2,
74                                       self.max_retry_wait)
75             else:
76                 self.retry_wait = self.min_retry_wait
77         return retry_wrapper
78     return decorator
79
80 class ShutdownTimer(object):
81     """Keep track of a cloud node's shutdown windows.
82
83     Instantiate this class with a timestamp of when a cloud node started,
84     and a list of durations (in minutes) of when the node must not and may
85     be shut down, alternating.  The class will tell you when a shutdown
86     window is open, and when the next open window will start.
87     """
88     def __init__(self, start_time, shutdown_windows):
89         # The implementation is easiest if we have an even number of windows,
90         # because then windows always alternate between open and closed.
91         # Rig that up: calculate the first shutdown window based on what's
92         # passed in.  Then, if we were given an odd number of windows, merge
93         # that first window into the last one, since they both# represent
94         # closed state.
95         first_window = shutdown_windows[0]
96         shutdown_windows = list(shutdown_windows[1:])
97         self._next_opening = start_time + (60 * first_window)
98         if len(shutdown_windows) % 2:
99             shutdown_windows.append(first_window)
100         else:
101             shutdown_windows[-1] += first_window
102         self.shutdown_windows = itertools.cycle([60 * n
103                                                  for n in shutdown_windows])
104         self._open_start = self._next_opening
105         self._open_for = next(self.shutdown_windows)
106
107     def _advance_opening(self):
108         while self._next_opening < time.time():
109             self._open_start = self._next_opening
110             self._next_opening += self._open_for + next(self.shutdown_windows)
111             self._open_for = next(self.shutdown_windows)
112
113     def next_opening(self):
114         self._advance_opening()
115         return self._next_opening
116
117     def window_open(self):
118         self._advance_opening()
119         return 0 < (time.time() - self._open_start) < self._open_for