Merge branch '11345-nodemanager-retry-after' refs #11345
authorPeter Amstutz <peter.amstutz@curoverse.com>
Sat, 10 Jun 2017 01:30:44 +0000 (21:30 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Sat, 10 Jun 2017 01:30:44 +0000 (21:30 -0400)
13 files changed:
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/azure.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/test/fake_driver.py
services/nodemanager/tests/integration_test.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
services/nodemanager/tests/test_computenode_driver_azure.py
services/nodemanager/tests/test_computenode_driver_ec2.py

index 20b274b1587f6acbf276b193dda89370956a44ec..d2c3d0c73ea90951e050528fd16f571e8b1d6b76 100644 (file)
@@ -8,6 +8,9 @@ import itertools
 import re
 import time
 
+from ..config import CLOUD_ERRORS
+from libcloud.common.exceptions import BaseHTTPError
+
 ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
 ARVADOS_TIMESUBSEC_RE = re.compile(r'(\.\d+)Z$')
 
@@ -70,43 +73,66 @@ class RetryMixin(object):
             @functools.wraps(orig_func)
             def retry_wrapper(self, *args, **kwargs):
                 while True:
+                    should_retry = False
                     try:
                         ret = orig_func(self, *args, **kwargs)
+                    except BaseHTTPError as error:
+                        if error.headers and error.headers.get("retry-after"):
+                            try:
+                                self.retry_wait = int(error.headers["retry-after"])
+                                if self.retry_wait < 0 or self.retry_wait > self.max_retry_wait:
+                                    self.retry_wait = self.max_retry_wait
+                                should_retry = True
+                            except ValueError:
+                                pass
+                        if error.code == 429 or error.code >= 500:
+                            should_retry = True
+                    except CLOUD_ERRORS as error:
+                        should_retry = True
+                    except errors as error:
+                        should_retry = True
                     except Exception as error:
-                        if not (isinstance(error, errors) or
-                                self._cloud.is_cloud_exception(error)):
-                            self.retry_wait = self.min_retry_wait
-                            self._logger.warning(
-                                "Re-raising unknown error (no retry): %s",
-                                error, exc_info=error)
-                            raise
-
-                        self._logger.warning(
-                            "Client error: %s - %s %s seconds",
-                            error,
-                            "scheduling retry in" if self._timer else "sleeping",
-                            self.retry_wait,
-                            exc_info=error)
-
-                        if self._timer:
-                            start_time = time.time()
-                            # reschedule to be called again
-                            self._timer.schedule(start_time + self.retry_wait,
-                                                 getattr(self._later,
-                                                         orig_func.__name__),
-                                                 *args, **kwargs)
-                        else:
-                            # sleep on it.
-                            time.sleep(self.retry_wait)
-
-                        self.retry_wait = min(self.retry_wait * 2,
-                                              self.max_retry_wait)
-                        if self._timer:
-                            # expect to be called again by timer so don't loop
-                            return
+                        # As a libcloud workaround for drivers that don't use
+                        # typed exceptions, consider bare Exception() objects
+                        # retryable.
+                        should_retry = type(error) is Exception
                     else:
+                        # No exception,
                         self.retry_wait = self.min_retry_wait
                         return ret
+
+                    # Only got here if an exception was caught.  Now determine what to do about it.
+                    if not should_retry:
+                        self.retry_wait = self.min_retry_wait
+                        self._logger.warning(
+                            "Re-raising error (no retry): %s",
+                            error, exc_info=error)
+                        raise
+
+                    self._logger.warning(
+                        "Client error: %s - %s %s seconds",
+                        error,
+                        "scheduling retry in" if self._timer else "sleeping",
+                        self.retry_wait,
+                        exc_info=error)
+
+                    if self._timer:
+                        start_time = time.time()
+                        # reschedule to be called again
+                        self._timer.schedule(start_time + self.retry_wait,
+                                             getattr(self._later,
+                                                     orig_func.__name__),
+                                             *args, **kwargs)
+                    else:
+                        # sleep on it.
+                        time.sleep(self.retry_wait)
+
+                    self.retry_wait = min(self.retry_wait * 2,
+                                          self.max_retry_wait)
+                    if self._timer:
+                        # expect to be called again by timer so don't loop
+                        return
+
             return retry_wrapper
         return decorator
 
index 63dac3f0edd0b4938b55c945001e07ceb270da25..4463ec6f53ff6adb8fdf82918308d04951f607dd 100644 (file)
@@ -8,6 +8,8 @@ import time
 import re
 
 import libcloud.common.types as cloud_types
+from libcloud.common.exceptions import BaseHTTPError
+
 import pykka
 
 from .. import \
@@ -126,7 +128,12 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         try:
             self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                       self.arvados_node)
-        except Exception as e:
+        except BaseHTTPError as e:
+            if e.code == 429 or "RequestLimitExceeded" in e.message:
+                # Don't consider API rate limits to be quota errors.
+                # re-raise so the Retry logic applies.
+                raise
+
             # The set of possible error codes / messages isn't documented for
             # all clouds, so use a keyword heuristic to determine if the
             # failure is likely due to a quota.
@@ -136,7 +143,10 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                 self._finished()
                 return
             else:
+                # Something else happened, re-raise so the Retry logic applies.
                 raise
+        except Exception as e:
+            raise
 
         # The information included in the node size object we get from libcloud
         # is inconsistent between cloud drivers.  Replace libcloud NodeSize
@@ -272,7 +282,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self._finished(success_flag=True)
 
 
-class ComputeNodeUpdateActor(config.actor_class):
+class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
     """Actor to dispatch one-off cloud management requests.
 
     This actor receives requests for small cloud updates, and
@@ -281,12 +291,12 @@ class ComputeNodeUpdateActor(config.actor_class):
     dedicated actor for this gives us the opportunity to control the
     flow of requests; e.g., by backing off when errors occur.
     """
-    def __init__(self, cloud_factory, max_retry_wait=180):
+    def __init__(self, cloud_factory, timer_actor, max_retry_wait=180):
         super(ComputeNodeUpdateActor, self).__init__()
+        RetryMixin.__init__(self, 1, max_retry_wait,
+                            None, cloud_factory(), timer_actor)
         self._cloud = cloud_factory()
-        self.max_retry_wait = max_retry_wait
-        self.error_streak = 0
-        self.next_request_time = time.time()
+        self._later = self.actor_ref.tell_proxy()
 
     def _set_logger(self):
         self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
@@ -294,28 +304,7 @@ class ComputeNodeUpdateActor(config.actor_class):
     def on_start(self):
         self._set_logger()
 
-    def _throttle_errors(orig_func):
-        @functools.wraps(orig_func)
-        def throttle_wrapper(self, *args, **kwargs):
-            throttle_time = self.next_request_time - time.time()
-            if throttle_time > 0:
-                time.sleep(throttle_time)
-            self.next_request_time = time.time()
-            try:
-                result = orig_func(self, *args, **kwargs)
-            except Exception as error:
-                if self._cloud.is_cloud_exception(error):
-                    self.error_streak += 1
-                    self.next_request_time += min(2 ** self.error_streak,
-                                                  self.max_retry_wait)
-                self._logger.warn(
-                    "Unhandled exception: %s", error, exc_info=error)
-            else:
-                self.error_streak = 0
-                return result
-        return throttle_wrapper
-
-    @_throttle_errors
+    @RetryMixin._retry()
     def sync_node(self, cloud_node, arvados_node):
         return self._cloud.sync_node(cloud_node, arvados_node)
 
index 0c8ddc29eb6ca7b37ebc947addfffbf08dc7ab46..11cc4e5384637a2def7a4235582cb402818dd0cc 100644 (file)
@@ -39,7 +39,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
             self._logger.info("Draining SLURM node %s", self._nodename)
             self._later.issue_slurm_drain()
 
-    @RetryMixin._retry((subprocess.CalledProcessError,))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     def cancel_shutdown(self, reason, try_resume=True):
         if self._nodename:
             if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
@@ -51,7 +51,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
                 pass
         return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
 
-    @RetryMixin._retry((subprocess.CalledProcessError,))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     def issue_slurm_drain(self):
         if self.cancel_reason is not None:
             return
@@ -62,7 +62,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         else:
             self._later.shutdown_node()
 
-    @RetryMixin._retry((subprocess.CalledProcessError,))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     def await_slurm_drain(self):
         if self.cancel_reason is not None:
             return
index 442034170a4ffee4473eadb6c439e43f5e7e00d3..c8c54dc0bfb9384c3f79545309fff86433409133 100644 (file)
@@ -6,10 +6,9 @@ import logging
 from operator import attrgetter
 
 import libcloud.common.types as cloud_types
-from libcloud.common.exceptions import BaseHTTPError
 from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
 
-from ...config import NETWORK_ERRORS
+from ...config import CLOUD_ERRORS
 from .. import RetryMixin
 
 class BaseComputeNodeDriver(RetryMixin):
@@ -25,7 +24,7 @@ class BaseComputeNodeDriver(RetryMixin):
     Subclasses must implement arvados_create_kwargs, sync_node,
     node_fqdn, and node_start_time.
     """
-    CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError, BaseHTTPError)
+
 
     @RetryMixin._retry()
     def _create_driver(self, driver_class, **auth_kwargs):
@@ -169,7 +168,7 @@ class BaseComputeNodeDriver(RetryMixin):
             kwargs.update(self.arvados_create_kwargs(size, arvados_node))
             kwargs['size'] = size
             return self.real.create_node(**kwargs)
-        except self.CLOUD_ERRORS as create_error:
+        except CLOUD_ERRORS as create_error:
             # Workaround for bug #6702: sometimes the create node request
             # succeeds but times out and raises an exception instead of
             # returning a result.  If this happens, we get stuck in a retry
@@ -206,18 +205,10 @@ class BaseComputeNodeDriver(RetryMixin):
         # seconds since the epoch UTC.
         raise NotImplementedError("BaseComputeNodeDriver.node_start_time")
 
-    @classmethod
-    def is_cloud_exception(cls, exception):
-        # libcloud compute drivers typically raise bare Exceptions to
-        # represent API errors.  Return True for any exception that is
-        # exactly an Exception, or a better-known higher-level exception.
-        return (isinstance(exception, cls.CLOUD_ERRORS) or
-                type(exception) is Exception)
-
     def destroy_node(self, cloud_node):
         try:
             return self.real.destroy_node(cloud_node)
-        except self.CLOUD_ERRORS as destroy_error:
+        except CLOUD_ERRORS as destroy_error:
             # Sometimes the destroy node request succeeds but times out and
             # raises an exception instead of returning success.  If this
             # happens, we get a noisy stack trace.  Check if the node is still
index e293d1bebeb5a479b69ff3e22784b9a467b17dd2..c707c2a9f7bc2c274b2a4d295a2f59139dc47ef5 100644 (file)
@@ -17,7 +17,6 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
 
     DEFAULT_DRIVER = cloud_provider.get_driver(cloud_types.Provider.AZURE_ARM)
     SEARCH_CACHE = {}
-    CLOUD_ERRORS = BaseComputeNodeDriver.CLOUD_ERRORS + (BaseHTTPError,)
 
     def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
                  driver_class=DEFAULT_DRIVER):
index f884295e37c7556976ce35ba186954936cc22ed4..a16e0a8e62bac7930c3c4e6dfcded4cca21b2c3a 100644 (file)
@@ -14,11 +14,15 @@ from apiclient import errors as apierror
 
 from .baseactor import BaseNodeManagerActor
 
+from libcloud.common.types import LibcloudError
+from libcloud.common.exceptions import BaseHTTPError
+
 # IOError is the base class for socket.error, ssl.SSLError, and friends.
 # It seems like it hits the sweet spot for operations we want to retry:
 # it's low-level, but unlikely to catch code bugs.
 NETWORK_ERRORS = (IOError,)
 ARVADOS_ERRORS = NETWORK_ERRORS + (apierror.Error,)
+CLOUD_ERRORS = NETWORK_ERRORS + (LibcloudError, BaseHTTPError)
 
 actor_class = BaseNodeManagerActor
 
index 3cd097a628e827145c5a59157da31edf6c6b161c..72a285baed4685d5709cb9310eadf1ff9803dd9c 100644 (file)
@@ -128,7 +128,7 @@ def main(args=None):
         server_calculator = build_server_calculator(config)
         timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
             launch_pollers(config, server_calculator)
-        cloud_node_updater = node_update.start(config.new_cloud_client).tell_proxy()
+        cloud_node_updater = node_update.start(config.new_cloud_client, timer).tell_proxy()
         node_daemon = NodeManagerDaemonActor.start(
             job_queue_poller, arvados_node_poller, cloud_node_poller,
             cloud_node_updater, timer,
index 1785e0559ee8bd74a9045907088d54a19e55e946..8e7cf2ffcca77d78d176cdaa1091536b6ac6ddc6 100644 (file)
@@ -97,3 +97,31 @@ class FailingDriver(FakeDriver):
                     ex_tags=None,
                     ex_network=None):
         raise Exception("nope")
+
+class RetryDriver(FakeDriver):
+    def create_node(self, name=None,
+                    size=None,
+                    image=None,
+                    auth=None,
+                    ex_storage_account=None,
+                    ex_customdata=None,
+                    ex_resource_group=None,
+                    ex_user_name=None,
+                    ex_tags=None,
+                    ex_network=None):
+        global create_calls
+        create_calls += 1
+        if create_calls < 2:
+            raise BaseHTTPError(429, "Rate limit exceeded",
+                                {'retry-after': '12'})
+        else:
+            return super(RetryDriver, self).create_node(name=name,
+                    size=size,
+                    image=image,
+                    auth=auth,
+                    ex_storage_account=ex_storage_account,
+                    ex_customdata=ex_customdata,
+                    ex_resource_group=ex_resource_group,
+                    ex_user_name=ex_user_name,
+                    ex_tags=ex_tags,
+                    ex_network=ex_network)
index 6bd7fd10323cefa7621f446e03866009efbf9162..f024b0cffecb967a5c863a685d92895eb36e43ee 100755 (executable)
@@ -331,6 +331,16 @@ def main():
              "34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
              "34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
              "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
+         }),
+        "test_retry_create": (
+            [
+                (r".*Daemon started", set_squeue),
+                (r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
+                (r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
+            ],
+            {},
+            "arvnodeman.test.fake_driver.RetryDriver",
+            {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"
          })
     }
 
index b950cc1169c23b6990ae8cc4f0dd0aafe69914ba..598b293420b39f2bd1213be106aad4cf0937e7bc 100644 (file)
@@ -28,7 +28,6 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.api_client.nodes().update().execute.side_effect = arvados_effect
         self.cloud_client = mock.MagicMock(name='cloud_client')
         self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
-        self.cloud_client.is_cloud_exception = BaseComputeNodeDriver.is_cloud_exception
 
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
@@ -277,7 +276,8 @@ class ComputeNodeUpdateActorTestCase(testutil.ActorTestMixin,
 
     def make_actor(self):
         self.driver = mock.MagicMock(name='driver_mock')
-        self.updater = self.ACTOR_CLASS.start(self.driver).proxy()
+        self.timer = mock.MagicMock(name='timer_mock')
+        self.updater = self.ACTOR_CLASS.start(self.driver, self.timer).proxy()
 
     def test_node_sync(self, *args):
         self.make_actor()
index e1def28d8b8e7f5a34b9ff57dfccab9bedfb0cea..42189800a2a459948ab7de047aedd15782d9d50c 100644 (file)
@@ -23,11 +23,15 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         for s in args:
             self.assertIn(s, slurm_cmd)
 
-    def check_success_after_reset(self, proc_mock, end_state='drain\n'):
+    def check_success_after_reset(self, proc_mock, end_state='drain\n', timer=False):
         self.make_mocks(arvados_node=testutil.arvados_node_mock(63))
+        if not timer:
+            self.timer = testutil.MockTimer(False)
         self.make_actor()
         self.check_success_flag(None, 0)
+        self.timer.deliver()
         self.check_success_flag(None, 0)
+        self.timer.deliver()
         # Order is critical here: if the mock gets called when no return value
         # or side effect is set, we may invoke a real subprocess.
         proc_mock.return_value = end_state
@@ -85,7 +89,7 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
 
     def test_issue_slurm_drain_retry(self, proc_mock):
         proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
-        self.check_success_after_reset(proc_mock)
+        self.check_success_after_reset(proc_mock, timer=False)
 
     def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
         proc_mock.return_value = 'drain\n'
index 702688d88fd51052ff9704755b97d96de0bc50be..c4bc680d2ec87431484a8190a9dd5fda85e765ad 100644 (file)
@@ -69,19 +69,6 @@ class AzureComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase
         node.extra = {'tags': {"hostname": name}}
         self.assertEqual(name, azure.ComputeNodeDriver.node_fqdn(node))
 
-    def test_cloud_exceptions(self):
-        for error in [Exception("test exception"),
-                      IOError("test exception"),
-                      ssl.SSLError("test exception"),
-                      cloud_types.LibcloudError("test exception")]:
-            self.assertTrue(azure.ComputeNodeDriver.is_cloud_exception(error),
-                            "{} not flagged as cloud exception".format(error))
-
-    def test_noncloud_exceptions(self):
-        self.assertFalse(
-            azure.ComputeNodeDriver.is_cloud_exception(ValueError("test error")),
-            "ValueError flagged as cloud exception")
-
     def test_sync_node(self):
         arv_node = testutil.arvados_node_mock(1)
         cloud_node = testutil.cloud_node_mock(2)
index a778cd541d690159a916868f6729a49da674d629..14df3602313f8e1c249811395d0809dc57f961ee 100644 (file)
@@ -96,16 +96,3 @@ class EC2ComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         node = testutil.cloud_node_mock()
         node.name = name
         self.assertEqual(name, ec2.ComputeNodeDriver.node_fqdn(node))
-
-    def test_cloud_exceptions(self):
-        for error in [Exception("test exception"),
-                      IOError("test exception"),
-                      ssl.SSLError("test exception"),
-                      cloud_types.LibcloudError("test exception")]:
-            self.assertTrue(ec2.ComputeNodeDriver.is_cloud_exception(error),
-                            "{} not flagged as cloud exception".format(error))
-
-    def test_noncloud_exceptions(self):
-        self.assertFalse(
-            ec2.ComputeNodeDriver.is_cloud_exception(ValueError("test error")),
-            "ValueError flagged as cloud exception")