Merge branch '6605-arv-copy-http' closes #6605
[arvados.git] / services / nodemanager / arvnodeman / computenode / __init__.py
1 #!/usr/bin/env python
2
3 from __future__ import absolute_import, print_function
4
5 import calendar
6 import functools
7 import itertools
8 import re
9 import time
10
11 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
12 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
13
14 def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
15     hostname = arvados_node.get('hostname') or default_hostname
16     return '{}.{}'.format(hostname, arvados_node['domain'])
17
18 def arvados_node_mtime(node):
19     return arvados_timestamp(node['modified_at'])
20
21 def arvados_timestamp(timestr):
22     subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
23     if subsec_match is None:
24         subsecs = .0
25     else:
26         subsecs = float(subsec_match.group(1))
27         timestr = timestr[:subsec_match.start()] + 'Z'
28     return calendar.timegm(time.strptime(timestr + 'UTC',
29                                          ARVADOS_TIMEFMT + '%Z'))
30
31 def timestamp_fresh(timestamp, fresh_time):
32     return (time.time() - timestamp) < fresh_time
33
34 def arvados_node_missing(arvados_node, fresh_time):
35     """Indicate if cloud node corresponding to the arvados
36     node is "missing".
37
38     If True, this means the node has not pinged the API server within the timeout
39     period.  If False, the ping is up to date.  If the node has never pinged,
40     returns None.
41     """
42     if arvados_node["last_ping_at"] is None:
43         return None
44     else:
45         return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
46
47 class RetryMixin(object):
48     """Retry decorator for an method that makes remote requests.
49
50     Use this function to decorate method, and pass in a tuple of exceptions to
51     catch.  If the original method raises a known cloud driver error, or any of
52     the given exception types, this decorator will either go into a
53     sleep-and-retry loop with exponential backoff either by sleeping (if
54     self._timer is None) or by scheduling retries of the method (if self._timer
55     is a timer actor.)
56
57     """
58     def __init__(self, retry_wait, max_retry_wait,
59                  logger, cloud, timer=None):
60         self.min_retry_wait = retry_wait
61         self.max_retry_wait = max_retry_wait
62         self.retry_wait = retry_wait
63         self._logger = logger
64         self._cloud = cloud
65         self._timer = timer
66
67     @staticmethod
68     def _retry(errors=()):
69         def decorator(orig_func):
70             @functools.wraps(orig_func)
71             def retry_wrapper(self, *args, **kwargs):
72                 while True:
73                     try:
74                         ret = orig_func(self, *args, **kwargs)
75                     except Exception as error:
76                         if not (isinstance(error, errors) or
77                                 self._cloud.is_cloud_exception(error)):
78                             self.retry_wait = self.min_retry_wait
79                             self._logger.warning(
80                                 "Re-raising unknown error (no retry): %s",
81                                 error, exc_info=error)
82                             raise
83
84                         self._logger.warning(
85                             "Client error: %s - waiting %s seconds",
86                             error, self.retry_wait, exc_info=error)
87
88                         if self._timer:
89                             start_time = time.time()
90                             # reschedule to be called again
91                             self._timer.schedule(start_time + self.retry_wait,
92                                                  getattr(self._later,
93                                                          orig_func.__name__),
94                                                  *args, **kwargs)
95                         else:
96                             # sleep on it.
97                             time.sleep(self.retry_wait)
98
99                         self.retry_wait = min(self.retry_wait * 2,
100                                               self.max_retry_wait)
101                         if self._timer:
102                             # expect to be called again by timer so don't loop
103                             return
104                     else:
105                         self.retry_wait = self.min_retry_wait
106                         return ret
107             return retry_wrapper
108         return decorator
109
110 class ShutdownTimer(object):
111     """Keep track of a cloud node's shutdown windows.
112
113     Instantiate this class with a timestamp of when a cloud node started,
114     and a list of durations (in minutes) of when the node must not and may
115     be shut down, alternating.  The class will tell you when a shutdown
116     window is open, and when the next open window will start.
117     """
118     def __init__(self, start_time, shutdown_windows):
119         # The implementation is easiest if we have an even number of windows,
120         # because then windows always alternate between open and closed.
121         # Rig that up: calculate the first shutdown window based on what's
122         # passed in.  Then, if we were given an odd number of windows, merge
123         # that first window into the last one, since they both# represent
124         # closed state.
125         first_window = shutdown_windows[0]
126         shutdown_windows = list(shutdown_windows[1:])
127         self._next_opening = start_time + (60 * first_window)
128         if len(shutdown_windows) % 2:
129             shutdown_windows.append(first_window)
130         else:
131             shutdown_windows[-1] += first_window
132         self.shutdown_windows = itertools.cycle([60 * n
133                                                  for n in shutdown_windows])
134         self._open_start = self._next_opening
135         self._open_for = next(self.shutdown_windows)
136
137     def _advance_opening(self):
138         while self._next_opening < time.time():
139             self._open_start = self._next_opening
140             self._next_opening += self._open_for + next(self.shutdown_windows)
141             self._open_for = next(self.shutdown_windows)
142
143     def next_opening(self):
144         self._advance_opening()
145         return self._next_opening
146
147     def window_open(self):
148         self._advance_opening()
149         return 0 < (time.time() - self._open_start) < self._open_for