from __future__ import absolute_import, print_function
+import calendar
+import functools
import itertools
+import re
import time
ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
+ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'):
hostname = arvados_node.get('hostname') or default_hostname
return arvados_timestamp(node['modified_at'])
def arvados_timestamp(timestr):
- return time.mktime(time.strptime(timestr + 'UTC',
- ARVADOS_TIMEFMT + '%Z')) - time.timezone
+ subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr)
+ if subsec_match is None:
+ subsecs = .0
+ else:
+ subsecs = float(subsec_match.group(1))
+ timestr = timestr[:subsec_match.start()] + 'Z'
+ return calendar.timegm(time.strptime(timestr + 'UTC',
+ ARVADOS_TIMEFMT + '%Z'))
def timestamp_fresh(timestamp, fresh_time):
return (time.time() - timestamp) < fresh_time
+def arvados_node_missing(arvados_node, fresh_time):
+ """Indicate if cloud node corresponding to the arvados
+ node is "missing".
+
+ If True, this means the node has not pinged the API server within the timeout
+ period. If False, the ping is up to date. If the node has never pinged,
+ returns None.
+ """
+ if arvados_node["last_ping_at"] is None:
+ return None
+ else:
+ return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
+
+class RetryMixin(object):
+ """Retry decorator for an method that makes remote requests.
+
+ Use this function to decorate method, and pass in a tuple of exceptions to
+ catch. If the original method raises a known cloud driver error, or any of
+ the given exception types, this decorator will either go into a
+ sleep-and-retry loop with exponential backoff either by sleeping (if
+ self._timer is None) or by scheduling retries of the method (if self._timer
+ is a timer actor.)
+
+ """
+ def __init__(self, retry_wait, max_retry_wait,
+ logger, cloud, timer=None):
+ self.min_retry_wait = retry_wait
+ self.max_retry_wait = max_retry_wait
+ self.retry_wait = retry_wait
+ self._logger = logger
+ self._cloud = cloud
+ self._timer = timer
+
+ @staticmethod
+ def _retry(errors=()):
+ def decorator(orig_func):
+ @functools.wraps(orig_func)
+ def retry_wrapper(self, *args, **kwargs):
+ while True:
+ try:
+ ret = orig_func(self, *args, **kwargs)
+ except Exception as error:
+ if not (isinstance(error, errors) or
+ self._cloud.is_cloud_exception(error)):
+ self.retry_wait = self.min_retry_wait
+ self._logger.warning(
+ "Re-raising unknown error (no retry): %s",
+ error, exc_info=error)
+ raise
+
+ self._logger.warning(
+ "Client error: %s - waiting %s seconds",
+ error, self.retry_wait, exc_info=error)
+
+ if self._timer:
+ start_time = time.time()
+ # reschedule to be called again
+ self._timer.schedule(start_time + self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ else:
+ # sleep on it.
+ time.sleep(self.retry_wait)
+
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ if self._timer:
+ # expect to be called again by timer so don't loop
+ return
+ else:
+ self.retry_wait = self.min_retry_wait
+ return ret
+ return retry_wrapper
+ return decorator
+
class ShutdownTimer(object):
"""Keep track of a cloud node's shutdown windows.