X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8f9f0dece977ccf5a778f3b3bd2379375e723c18..cac5db66cba0d5dd97c8434853bcbf2ab19fbda5:/services/nodemanager/arvnodeman/computenode/__init__.py diff --git a/services/nodemanager/arvnodeman/computenode/__init__.py b/services/nodemanager/arvnodeman/computenode/__init__.py index 4955992faa..54d6a82bce 100644 --- a/services/nodemanager/arvnodeman/computenode/__init__.py +++ b/services/nodemanager/arvnodeman/computenode/__init__.py @@ -2,20 +2,111 @@ from __future__ import absolute_import, print_function +import calendar +import functools import itertools +import re import time +ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ' +ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$') + def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'): hostname = arvados_node.get('hostname') or default_hostname return '{}.{}'.format(hostname, arvados_node['domain']) def arvados_node_mtime(node): - return time.mktime(time.strptime(node['modified_at'] + 'UTC', - '%Y-%m-%dT%H:%M:%SZ%Z')) - time.timezone + return arvados_timestamp(node['modified_at']) + +def arvados_timestamp(timestr): + subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr) + if subsec_match is None: + subsecs = .0 + else: + subsecs = float(subsec_match.group(1)) + timestr = timestr[:subsec_match.start()] + 'Z' + return calendar.timegm(time.strptime(timestr + 'UTC', + ARVADOS_TIMEFMT + '%Z')) def timestamp_fresh(timestamp, fresh_time): return (time.time() - timestamp) < fresh_time +def arvados_node_missing(arvados_node, fresh_time): + """Indicate if cloud node corresponding to the arvados + node is "missing". + + If True, this means the node has not pinged the API server within the timeout + period. If False, the ping is up to date. If the node has never pinged, + returns None. + """ + if arvados_node["last_ping_at"] is None: + return None + else: + return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time) + +class RetryMixin(object): + """Retry decorator for an method that makes remote requests. + + Use this function to decorate method, and pass in a tuple of exceptions to + catch. If the original method raises a known cloud driver error, or any of + the given exception types, this decorator will either go into a + sleep-and-retry loop with exponential backoff either by sleeping (if + self._timer is None) or by scheduling retries of the method (if self._timer + is a timer actor.) + + """ + def __init__(self, retry_wait, max_retry_wait, + logger, cloud, timer=None): + self.min_retry_wait = retry_wait + self.max_retry_wait = max_retry_wait + self.retry_wait = retry_wait + self._logger = logger + self._cloud = cloud + self._timer = timer + + @staticmethod + def _retry(errors=()): + def decorator(orig_func): + @functools.wraps(orig_func) + def retry_wrapper(self, *args, **kwargs): + while True: + try: + ret = orig_func(self, *args, **kwargs) + except Exception as error: + if not (isinstance(error, errors) or + self._cloud.is_cloud_exception(error)): + self.retry_wait = self.min_retry_wait + self._logger.warning( + "Re-raising unknown error (no retry): %s", + error, exc_info=error) + raise + + self._logger.warning( + "Client error: %s - waiting %s seconds", + error, self.retry_wait, exc_info=error) + + if self._timer: + start_time = time.time() + # reschedule to be called again + self._timer.schedule(start_time + self.retry_wait, + getattr(self._later, + orig_func.__name__), + *args, **kwargs) + else: + # sleep on it. + time.sleep(self.retry_wait) + + self.retry_wait = min(self.retry_wait * 2, + self.max_retry_wait) + if self._timer: + # expect to be called again by timer so don't loop + return + else: + self.retry_wait = self.min_retry_wait + return ret + return retry_wrapper + return decorator + class ShutdownTimer(object): """Keep track of a cloud node's shutdown windows.