import re
import time
+from ..config import CLOUD_ERRORS
+from libcloud.common.exceptions import BaseHTTPError
+
ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
@functools.wraps(orig_func)
def retry_wrapper(self, *args, **kwargs):
while True:
+ should_retry = False
try:
ret = orig_func(self, *args, **kwargs)
+ except BaseHTTPError as error:
+ if error.headers and error.headers.get("retry-after"):
+ try:
+ self.retry_wait = int(error.headers["retry-after"])
+ if self.retry_wait < 0 or self.retry_wait > self.max_retry_wait:
+ self.retry_wait = self.max_retry_wait
+ should_retry = True
+ except ValueError:
+ pass
+ if error.code == 429 or error.code >= 500:
+ should_retry = True
+ except CLOUD_ERRORS as error:
+ should_retry = True
+ except errors as error:
+ should_retry = True
except Exception as error:
- if not (isinstance(error, errors) or
- self._cloud.is_cloud_exception(error)):
- self.retry_wait = self.min_retry_wait
- self._logger.warning(
- "Re-raising unknown error (no retry): %s",
- error, exc_info=error)
- raise
-
- self._logger.warning(
- "Client error: %s - %s %s seconds",
- error,
- "scheduling retry in" if self._timer else "sleeping",
- self.retry_wait,
- exc_info=error)
-
- if self._timer:
- start_time = time.time()
- # reschedule to be called again
- self._timer.schedule(start_time + self.retry_wait,
- getattr(self._later,
- orig_func.__name__),
- *args, **kwargs)
- else:
- # sleep on it.
- time.sleep(self.retry_wait)
-
- self.retry_wait = min(self.retry_wait * 2,
- self.max_retry_wait)
- if self._timer:
- # expect to be called again by timer so don't loop
- return
+ # As a libcloud workaround for drivers that don't use
+ # typed exceptions, consider bare Exception() objects
+ # retryable.
+ should_retry = type(error) is Exception
else:
+ # No exception,
self.retry_wait = self.min_retry_wait
return ret
+
+ # Only got here if an exception was caught. Now determine what to do about it.
+ if not should_retry:
+ self.retry_wait = self.min_retry_wait
+ self._logger.warning(
+ "Re-raising error (no retry): %s",
+ error, exc_info=error)
+ raise
+
+ self._logger.warning(
+ "Client error: %s - %s %s seconds",
+ error,
+ "scheduling retry in" if self._timer else "sleeping",
+ self.retry_wait,
+ exc_info=error)
+
+ if self._timer:
+ start_time = time.time()
+ # reschedule to be called again
+ self._timer.schedule(start_time + self.retry_wait,
+ getattr(self._later,
+ orig_func.__name__),
+ *args, **kwargs)
+ else:
+ # sleep on it.
+ time.sleep(self.retry_wait)
+
+ self.retry_wait = min(self.retry_wait * 2,
+ self.max_retry_wait)
+ if self._timer:
+ # expect to be called again by timer so don't loop
+ return
+
return retry_wrapper
return decorator
import re
import libcloud.common.types as cloud_types
+from libcloud.common.exceptions import BaseHTTPError
+
import pykka
from .. import \
try:
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
- except Exception as e:
+ except BaseHTTPError as e:
+ if e.code == 429 or "RequestLimitExceeded" in e.message:
+ # Don't consider API rate limits to be quota errors.
+ # re-raise so the Retry logic applies.
+ raise
+
# The set of possible error codes / messages isn't documented for
# all clouds, so use a keyword heuristic to determine if the
# failure is likely due to a quota.
self._finished()
return
else:
+ # Something else happened, re-raise so the Retry logic applies.
raise
+ except Exception as e:
+ raise
# The information included in the node size object we get from libcloud
# is inconsistent between cloud drivers. Replace libcloud NodeSize
self._finished(success_flag=True)
-class ComputeNodeUpdateActor(config.actor_class):
+class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
"""Actor to dispatch one-off cloud management requests.
This actor receives requests for small cloud updates, and
dedicated actor for this gives us the opportunity to control the
flow of requests; e.g., by backing off when errors occur.
"""
- def __init__(self, cloud_factory, max_retry_wait=180):
+ def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
super(ComputeNodeUpdateActor, self).__init__()
+ RetryMixin.__init__(self, 1, max_retry_wait,
+ None, cloud_factory(), timer_actor)
self._cloud = cloud_factory()
- self.max_retry_wait = max_retry_wait
- self.error_streak = 0
- self.next_request_time = time.time()
+ self._later = self.actor_ref.tell_proxy()
def _set_logger(self):
self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
def on_start(self):
self._set_logger()
- def _throttle_errors(orig_func):
- @functools.wraps(orig_func)
- def throttle_wrapper(self, *args, **kwargs):
- throttle_time = self.next_request_time - time.time()
- if throttle_time > 0:
- time.sleep(throttle_time)
- self.next_request_time = time.time()
- try:
- result = orig_func(self, *args, **kwargs)
- except Exception as error:
- if self._cloud.is_cloud_exception(error):
- self.error_streak += 1
- self.next_request_time += min(2 ** self.error_streak,
- self.max_retry_wait)
- self._logger.warn(
- "Unhandled exception: %s", error, exc_info=error)
- else:
- self.error_streak = 0
- return result
- return throttle_wrapper
-
- @_throttle_errors
+ @RetryMixin._retry()
def sync_node(self, cloud_node, arvados_node):
return self._cloud.sync_node(cloud_node, arvados_node)
self._logger.info("Draining SLURM node %s", self._nodename)
self._later.issue_slurm_drain()
- @RetryMixin._retry((subprocess.CalledProcessError,))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
def cancel_shutdown(self, reason, try_resume=True):
if self._nodename:
if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
pass
return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
- @RetryMixin._retry((subprocess.CalledProcessError,))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
def issue_slurm_drain(self):
if self.cancel_reason is not None:
return
else:
self._later.shutdown_node()
- @RetryMixin._retry((subprocess.CalledProcessError,))
+ @RetryMixin._retry((subprocess.CalledProcessError, OSError))
def await_slurm_drain(self):
if self.cancel_reason is not None:
return
from operator import attrgetter
import libcloud.common.types as cloud_types
-from libcloud.common.exceptions import BaseHTTPError
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
-from ...config import NETWORK_ERRORS
+from ...config import CLOUD_ERRORS
from .. import RetryMixin
class BaseComputeNodeDriver(RetryMixin):
Subclasses must implement arvados_create_kwargs, sync_node,
node_fqdn, and node_start_time.
"""
- CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError, BaseHTTPError)
+
@RetryMixin._retry()
def _create_driver(self, driver_class, **auth_kwargs):
kwargs.update(self.arvados_create_kwargs(size, arvados_node))
kwargs['size'] = size
return self.real.create_node(**kwargs)
- except self.CLOUD_ERRORS as create_error:
+ except CLOUD_ERRORS as create_error:
# Workaround for bug #6702: sometimes the create node request
# succeeds but times out and raises an exception instead of
# returning a result. If this happens, we get stuck in a retry
# seconds since the epoch UTC.
raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
- @classmethod
- def is_cloud_exception(cls, exception):
- # libcloud compute drivers typically raise bare Exceptions to
- # represent API errors. Return True for any exception that is
- # exactly an Exception, or a better-known higher-level exception.
- return (isinstance(exception, cls.CLOUD_ERRORS) or
- type(exception) is Exception)
-
def destroy_node(self, cloud_node):
try:
return self.real.destroy_node(cloud_node)
- except self.CLOUD_ERRORS as destroy_error:
+ except CLOUD_ERRORS as destroy_error:
# Sometimes the destroy node request succeeds but times out and
# raises an exception instead of returning success. If this
# happens, we get a noisy stack trace. Check if the node is still
DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.AZURE_ARM)
SEARCH_CACHE = {}
- CLOUD_ERRORS = BaseComputeNodeDriver.CLOUD_ERRORS + (BaseHTTPError,)
def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
driver_class=DEFAULT_DRIVER):
from .baseactor import BaseNodeManagerActor
+from libcloud.common.types import LibcloudError
+from libcloud.common.exceptions import BaseHTTPError
+
# IOError is the base class for socket.error, ssl.SSLError, and friends.
# It seems like it hits the sweet spot for operations we want to retry:
# it's low-level, but unlikely to catch code bugs.
NETWORK_ERRORS = (IOError,)
ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
+CLOUD_ERRORS = NETWORK_ERRORS + (LibcloudError, BaseHTTPError)
actor_class = BaseNodeManagerActor
server_calculator = build_server_calculator(config)
timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
launch_pollers(config, server_calculator)
- cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
+ cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
node_daemon = NodeManagerDaemonActor.start(
job_queue_poller, arvados_node_poller, cloud_node_poller,
cloud_node_updater, timer,
ex_tags=None,
ex_network=None):
raise Exception("nope")
+
+class RetryDriver(FakeDriver):
+ def create_node(self, name=None,
+ size=None,
+ image=None,
+ auth=None,
+ ex_storage_account=None,
+ ex_customdata=None,
+ ex_resource_group=None,
+ ex_user_name=None,
+ ex_tags=None,
+ ex_network=None):
+ global create_calls
+ create_calls += 1
+ if create_calls < 2:
+ raise BaseHTTPError(429, "Rate limit exceeded",
+ {'retry-after': '12'})
+ else:
+ return super(RetryDriver, self).create_node(name=name,
+ size=size,
+ image=image,
+ auth=auth,
+ ex_storage_account=ex_storage_account,
+ ex_customdata=ex_customdata,
+ ex_resource_group=ex_resource_group,
+ ex_user_name=ex_user_name,
+ ex_tags=ex_tags,
+ ex_network=ex_network)
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+ }),
+ "test_retry_create": (
+ [
+ (r".*Daemon started", set_squeue),
+ (r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
+ (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
+ ],
+ {},
+ "arvnodeman.test.fake_driver.RetryDriver",
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"
})
}
self.api_client.nodes().update().execute.side_effect = arvados_effect
self.cloud_client = mock.MagicMock(name='cloud_client')
self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
- self.cloud_client.is_cloud_exception = BaseComputeNodeDriver.is_cloud_exception
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
def make_actor(self):
self.driver = mock.MagicMock(name='driver_mock')
- self.updater = self.ACTOR_CLASS.start(self.driver).proxy()
+ self.timer = mock.MagicMock(name='timer_mock')
+ self.updater = self.ACTOR_CLASS.start(self.driver, self.timer).proxy()
def test_node_sync(self, *args):
self.make_actor()
for s in args:
self.assertIn(s, slurm_cmd)
- def check_success_after_reset(self, proc_mock, end_state='drain\n'):
+ def check_success_after_reset(self, proc_mock, end_state='drain\n', timer=False):
self.make_mocks(arvados_node=testutil.arvados_node_mock(63))
+ if not timer:
+ self.timer = testutil.MockTimer(False)
self.make_actor()
self.check_success_flag(None, 0)
+ self.timer.deliver()
self.check_success_flag(None, 0)
+ self.timer.deliver()
# Order is critical here: if the mock gets called when no return value
# or side effect is set, we may invoke a real subprocess.
proc_mock.return_value = end_state
def test_issue_slurm_drain_retry(self, proc_mock):
proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
- self.check_success_after_reset(proc_mock)
+ self.check_success_after_reset(proc_mock, timer=False)
def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
proc_mock.return_value = 'drain\n'
node.extra = {'tags': {"hostname": name}}
self.assertEqual(name, azure.ComputeNodeDriver.node_fqdn(node))
- def test_cloud_exceptions(self):
- for error in [Exception("test exception"),
- IOError("test exception"),
- ssl.SSLError("test exception"),
- cloud_types.LibcloudError("test exception")]:
- self.assertTrue(azure.ComputeNodeDriver.is_cloud_exception(error),
- "{} not flagged as cloud exception".format(error))
-
- def test_noncloud_exceptions(self):
- self.assertFalse(
- azure.ComputeNodeDriver.is_cloud_exception(ValueError("test error")),
- "ValueError flagged as cloud exception")
-
def test_sync_node(self):
arv_node = testutil.arvados_node_mock(1)
cloud_node = testutil.cloud_node_mock(2)
node = testutil.cloud_node_mock()
node.name = name
self.assertEqual(name, ec2.ComputeNodeDriver.node_fqdn(node))
-
- def test_cloud_exceptions(self):
- for error in [Exception("test exception"),
- IOError("test exception"),
- ssl.SSLError("test exception"),
- cloud_types.LibcloudError("test exception")]:
- self.assertTrue(ec2.ComputeNodeDriver.is_cloud_exception(error),
- "{} not flagged as cloud exception".format(error))
-
- def test_noncloud_exceptions(self):
- self.assertFalse(
- ec2.ComputeNodeDriver.is_cloud_exception(ValueError("test error")),
- "ValueError flagged as cloud exception")