#!/usr/bin/env python # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: AGPL-3.0 from __future__ import absolute_import, print_function import calendar import functools import itertools import re import time from ..config import CLOUD_ERRORS from ..status import tracker from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ' ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$') def arvados_node_fqdn(arvados_node, default_hostname='dynamic.compute'): hostname = arvados_node.get('hostname') or default_hostname return '{}.{}'.format(hostname, arvados_node['domain']) def arvados_node_mtime(node): return arvados_timestamp(node['modified_at']) def arvados_timestamp(timestr): subsec_match = ARVADOS_TIMESUBSEC_RE.search(timestr) if subsec_match is None: subsecs = .0 else: subsecs = float(subsec_match.group(1)) timestr = timestr[:subsec_match.start()] + 'Z' return calendar.timegm(time.strptime(timestr + 'UTC', ARVADOS_TIMEFMT + '%Z')) + subsecs def timestamp_fresh(timestamp, fresh_time): return (time.time() - timestamp) < fresh_time def arvados_node_missing(arvados_node, fresh_time): """Indicate if cloud node corresponding to the arvados node is "missing". If True, this means the node has not pinged the API server within the timeout period. If False, the ping is up to date. If the node has never pinged, returns None. """ if arvados_node["last_ping_at"] is None: return None else: return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time) class RetryMixin(object): """Retry decorator for an method that makes remote requests. Use this function to decorate method, and pass in a tuple of exceptions to catch. If the original method raises a known cloud driver error, or any of the given exception types, this decorator will either go into a sleep-and-retry loop with exponential backoff either by sleeping (if self._timer is None) or by scheduling retries of the method (if self._timer is a timer actor.) """ def __init__(self, retry_wait, max_retry_wait, logger, cloud, timer=None): self.min_retry_wait = max(1, retry_wait) self.max_retry_wait = max(self.min_retry_wait, max_retry_wait) self.retry_wait = retry_wait self._logger = logger self._cloud = cloud self._timer = timer @staticmethod def _retry(errors=()): def decorator(orig_func): @functools.wraps(orig_func) def retry_wrapper(self, *args, **kwargs): while True: should_retry = False try: ret = orig_func(self, *args, **kwargs) except RateLimitReachedError as error: # If retry-after is zero, continue with exponential # backoff. if error.retry_after != 0: self.retry_wait = error.retry_after should_retry = True except BaseHTTPError as error: if error.headers and error.headers.get("retry-after"): try: retry_after = int(error.headers["retry-after"]) # If retry-after is zero, continue with # exponential backoff. if retry_after != 0: self.retry_wait = retry_after should_retry = True except ValueError: self._logger.warning( "Unrecognizable Retry-After header: %r", error.headers["retry-after"], exc_info=error) if error.code == 429 or error.code >= 500: should_retry = True except CLOUD_ERRORS as error: tracker.counter_add('cloud_errors') should_retry = True except errors as error: should_retry = True except Exception as error: # As a libcloud workaround for drivers that don't use # typed exceptions, consider bare Exception() objects # retryable. if type(error) is Exception: tracker.counter_add('cloud_errors') should_retry = True else: # No exception self.retry_wait = self.min_retry_wait return ret # Only got here if an exception was caught. Now determine what to do about it. if not should_retry: self.retry_wait = self.min_retry_wait self._logger.warning( "Re-raising error (no retry): %s", error, exc_info=error) raise # Retry wait out of bounds? if self.retry_wait < self.min_retry_wait: self.retry_wait = self.min_retry_wait elif self.retry_wait > self.max_retry_wait: self.retry_wait = self.max_retry_wait self._logger.warning( "Client error: %s - %s %s seconds", error, "scheduling retry in" if self._timer else "sleeping", self.retry_wait, exc_info=error) if self._timer: start_time = time.time() # reschedule to be called again self._timer.schedule(start_time + self.retry_wait, getattr(self._later, orig_func.__name__), *args, **kwargs) else: # sleep on it. time.sleep(self.retry_wait) self.retry_wait = min(self.retry_wait * 2, self.max_retry_wait) if self._timer: # expect to be called again by timer so don't loop return return retry_wrapper return decorator class ShutdownTimer(object): """Keep track of a cloud node's shutdown windows. Instantiate this class with a timestamp of when a cloud node started, and a list of durations (in minutes) of when the node must not and may be shut down, alternating. The class will tell you when a shutdown window is open, and when the next open window will start. """ def __init__(self, start_time, shutdown_windows): # The implementation is easiest if we have an even number of windows, # because then windows always alternate between open and closed. # Rig that up: calculate the first shutdown window based on what's # passed in. Then, if we were given an odd number of windows, merge # that first window into the last one, since they both# represent # closed state. first_window = shutdown_windows[0] shutdown_windows = list(shutdown_windows[1:]) self._next_opening = start_time + (60 * first_window) if len(shutdown_windows) % 2: shutdown_windows.append(first_window) else: shutdown_windows[-1] += first_window self.shutdown_windows = itertools.cycle([60 * n for n in shutdown_windows]) self._open_start = self._next_opening self._open_for = next(self.shutdown_windows) def _advance_opening(self): while self._next_opening < time.time(): self._open_start = self._next_opening self._next_opening += self._open_for + next(self.shutdown_windows) self._open_for = next(self.shutdown_windows) def next_opening(self): self._advance_opening() return self._next_opening def window_open(self): self._advance_opening() return 0 < (time.time() - self._open_start) < self._open_for