3147: Add retry support to Python SDK's KeepClient.
authorBrett Smith <brett@curoverse.com>
Fri, 22 Aug 2014 17:38:40 +0000 (13:38 -0400)
committerBrett Smith <brett@curoverse.com>
Tue, 26 Aug 2014 15:32:00 +0000 (11:32 -0400)
sdk/python/arvados/errors.py
sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py

index 1d9c77851eacde7f1f9571e697361c1a75a8849b..89910aa60f389f7e61ecfc9d4be6d492f09ad25f 100644 (file)
@@ -17,12 +17,14 @@ class SyntaxError(Exception):
     pass
 class AssertionError(Exception):
     pass
-class NotFoundError(Exception):
-    pass
 class CommandFailedError(Exception):
     pass
+class KeepReadError(Exception):
+    pass
 class KeepWriteError(Exception):
     pass
+class NotFoundError(KeepReadError):
+    pass
 class NotImplementedError(Exception):
     pass
 class NoKeepServersError(Exception):
index 909ee1fec4c52aa41e66dced077dae96aa9de579..e75d64e57f82c816f2fdf12882583740a74d7d8e 100644 (file)
@@ -27,6 +27,7 @@ global_client_object = None
 import arvados
 import arvados.config as config
 import arvados.errors
+import arvados.retry as retry
 import arvados.util
 
 class KeepLocator(object):
@@ -200,15 +201,83 @@ class KeepClient(object):
                 return self._done
 
 
+    class KeepService(object):
+        # Make requests to a single Keep service, and track results.
+        HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+                       ssl.SSLError)
+
+        def __init__(self, root, **headers):
+            self.root = root
+            self.last_result = None
+            self.success_flag = None
+            self.get_headers = {'Accept': 'application/octet-stream'}
+            self.get_headers.update(headers)
+            self.put_headers = headers
+
+        def usable(self):
+            return self.success_flag is not False
+
+        def finished(self):
+            return self.success_flag is not None
+
+        def last_status(self):
+            try:
+                return int(self.last_result[0].status)
+            except (AttributeError, IndexError, ValueError):
+                return None
+
+        def get(self, http, locator):
+            # http is an httplib2.Http object.
+            # locator is a KeepLocator object.
+            url = self.root + str(locator)
+            _logger.debug("Request: GET %s", url)
+            try:
+                with timer.Timer() as t:
+                    result = http.request(url.encode('utf-8'), 'GET',
+                                          headers=self.get_headers)
+            except self.HTTP_ERRORS as e:
+                _logger.debug("Request fail: GET %s => %s: %s",
+                              url, type(e), str(e))
+                self.last_result = e
+            else:
+                self.last_result = result
+                self.success_flag = retry.check_http_response_success(result)
+                content = result[1]
+                _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
+                             self.last_status(), len(content), t.msecs,
+                             (len(content)/(1024*1024))/t.secs)
+                if self.success_flag:
+                    resp_md5 = hashlib.md5(content).hexdigest()
+                    if resp_md5 == locator.md5sum:
+                        return content
+                    _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
+            return None
+
+        def put(self, http, hash_s, body):
+            url = self.root + hash_s
+            _logger.debug("Request: PUT %s", url)
+            try:
+                result = http.request(url.encode('utf-8'), 'PUT',
+                                      headers=self.put_headers, body=body)
+            except self.HTTP_ERRORS as e:
+                _logger.debug("Request fail: PUT %s => %s: %s",
+                              url, type(e), str(e))
+                self.last_result = e
+            else:
+                self.last_result = result
+                self.success_flag = retry.check_http_response_success(result)
+            return self.success_flag
+
+
     class KeepWriterThread(threading.Thread):
         """
         Write a blob of data to the given Keep server. On success, call
         save_response() of the given ThreadLimiter to save the returned
         locator.
         """
-        def __init__(self, api_token, **kwargs):
+        def __init__(self, keep_service, **kwargs):
             super(KeepClient.KeepWriterThread, self).__init__()
-            self._api_token = api_token
+            self.service = keep_service
             self.args = kwargs
             self._success = False
 
@@ -224,51 +293,35 @@ class KeepClient(object):
                 self.run_with_limiter(limiter)
 
         def run_with_limiter(self, limiter):
+            if self.service.finished():
+                return
             _logger.debug("KeepWriterThread %s proceeding %s %s",
                           str(threading.current_thread()),
                           self.args['data_hash'],
                           self.args['service_root'])
             h = httplib2.Http(timeout=self.args.get('timeout', None))
-            url = self.args['service_root'] + self.args['data_hash']
-            headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
-
-            if self.args['using_proxy']:
-                # We're using a proxy, so tell the proxy how many copies we
-                # want it to store
-                headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
-            try:
-                _logger.debug("Uploading to {}".format(url))
-                resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                          headers=headers,
-                                          body=self.args['data'])
-                if re.match(r'^2\d\d$', resp['status']):
-                    self._success = True
-                    _logger.debug("KeepWriterThread %s succeeded %s %s",
-                                  str(threading.current_thread()),
-                                  self.args['data_hash'],
-                                  self.args['service_root'])
+            self._success = bool(self.service.put(
+                    h, self.args['data_hash'], self.args['data']))
+            status = self.service.last_status()
+            if self._success:
+                resp, body = self.service.last_result
+                _logger.debug("KeepWriterThread %s succeeded %s %s",
+                              str(threading.current_thread()),
+                              self.args['data_hash'],
+                              self.args['service_root'])
+                # Tick the 'done' counter for the number of replica
+                # reported stored by the server, for the case that
+                # we're talking to a proxy or other backend that
+                # stores to multiple copies for us.
+                try:
+                    replicas_stored = int(resp['x-keep-replicas-stored'])
+                except (KeyError, ValueError):
                     replicas_stored = 1
-                    if 'x-keep-replicas-stored' in resp:
-                        # Tick the 'done' counter for the number of replica
-                        # reported stored by the server, for the case that
-                        # we're talking to a proxy or other backend that
-                        # stores to multiple copies for us.
-                        try:
-                            replicas_stored = int(resp['x-keep-replicas-stored'])
-                        except ValueError:
-                            pass
-                    limiter.save_response(content.strip(), replicas_stored)
-                else:
-                    _logger.debug("Request fail: PUT %s => %s %s",
-                                    url, resp['status'], content)
-            except (httplib2.HttpLib2Error,
-                    httplib.HTTPException,
-                    ssl.SSLError) as e:
-                # When using https, timeouts look like ssl.SSLError from here.
-                # "SSLError: The write operation timed out"
-                _logger.debug("Request fail: PUT %s => %s: %s",
-                                url, type(e), str(e))
+                limiter.save_response(body.strip(), replicas_stored)
+            elif status is not None:
+                _logger.debug("Request fail: PUT %s => %s %s",
+                              self.args['data_hash'], status,
+                              self.service.last_result[1])
 
 
     def __init__(self, api_client=None, proxy=None, timeout=60,
@@ -323,6 +376,7 @@ class KeepClient(object):
                 self.api_token = api_token
                 self.service_roots = [proxy]
                 self.using_proxy = True
+                self.static_service_roots = True
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -332,29 +386,35 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self.service_roots = None
                 self.using_proxy = None
+                self.static_service_roots = False
 
-    def shuffled_service_roots(self, hash):
-        if self.service_roots is None:
-            with self.lock:
-                try:
-                    keep_services = self.api_client.keep_services().accessible()
-                except Exception:  # API server predates Keep services.
-                    keep_services = self.api_client.keep_disks().list()
+    def build_service_roots(self, force_rebuild=False):
+        if (self.static_service_roots or
+              (self.service_roots and not force_rebuild)):
+            return
+        with self.lock:
+            try:
+                keep_services = self.api_client.keep_services().accessible()
+            except Exception:  # API server predates Keep services.
+                keep_services = self.api_client.keep_disks().list()
 
-                keep_services = keep_services.execute().get('items')
-                if not keep_services:
-                    raise arvados.errors.NoKeepServersError()
+            keep_services = keep_services.execute().get('items')
+            if not keep_services:
+                raise arvados.errors.NoKeepServersError()
 
-                self.using_proxy = (keep_services[0].get('service_type') ==
-                                    'proxy')
+            self.using_proxy = (keep_services[0].get('service_type') ==
+                                'proxy')
 
-                roots = (("http%s://%s:%d/" %
-                          ('s' if f['service_ssl_flag'] else '',
-                           f['service_host'],
-                           f['service_port']))
-                         for f in keep_services)
-                self.service_roots = sorted(set(roots))
-                _logger.debug(str(self.service_roots))
+            roots = (("http%s://%s:%d/" %
+                      ('s' if f['service_ssl_flag'] else '',
+                       f['service_host'],
+                       f['service_port']))
+                     for f in keep_services)
+            self.service_roots = sorted(set(roots))
+            _logger.debug(str(self.service_roots))
+
+    def shuffled_service_roots(self, hash, force_rebuild=False):
+        self.build_service_roots(force_rebuild)
 
         # Build an ordering with which to query the Keep servers based on the
         # contents of the hash.
@@ -454,113 +514,176 @@ class KeepClient(object):
         finally:
             self._cache_lock.release()
 
-    def get(self, loc_s):
+    def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+        # roots_map is a dictionary, mapping Keep service root strings
+        # to KeepService objects.  Poll for Keep services, and add any
+        # new ones to roots_map.  Return the current list of local
+        # root strings.
+        headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+        local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+        for root in local_roots:
+            if root not in roots_map:
+                roots_map[root] = self.KeepService(root, **headers)
+        return local_roots
+
+    @staticmethod
+    def _check_loop_result(result):
+        # KeepClient RetryLoops should save results as a 2-tuple: the
+        # actual result of the request, and the number of servers available
+        # to receive the request this round.
+        # This method returns True if there's a real result, False if
+        # there are no more servers available, otherwise None.
+        if isinstance(result, Exception):
+            return None
+        result, tried_server_count = result
+        if (result is not None) and (result is not False):
+            return True
+        elif tried_server_count < 1:
+            _logger.info("No more Keep services to try; giving up")
+            return False
+        else:
+            return None
+
+    def get(self, loc_s, num_retries=0):
+        """Get data from Keep.
+
+        This method fetches one or more blocks of data from Keep.  It
+        sends a request each Keep service registered with the API
+        server (or the proxy provided when this client was
+        instantiated), then each service named in location hints, in
+        sequence.  As soon as one service provides the data, it's
+        returned.
+
+        Arguments:
+        * loc_s: A string of one or more comma-separated locators to fetch.
+          This method returns the concatenation of these blocks.
+        * num_retries: The number of times to retry GET requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  Note that, in each loop, the method may try
+          to fetch data from every available Keep service, along with any
+          that are named in location hints in the locator.  Default 0.
+        """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
         slot, first = self.reserve_cache(expect_hash)
-
         if not first:
             v = slot.get()
             return v
 
-        try:
-            for service_root in self.shuffled_service_roots(expect_hash):
-                url = service_root + loc_s
-                headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
-                           'Accept': 'application/octet-stream'}
-                blob = self.get_url(url, headers, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-
-            for hint in locator.hints:
-                if not hint.startswith('K@'):
-                    continue
-                url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
-                blob = self.get_url(url, {}, expect_hash)
-                if blob:
-                    slot.set(blob)
-                    self.cap_cache()
-                    return blob
-        except:
-            slot.set(None)
-            self.cap_cache()
-            raise
-
-        slot.set(None)
+        # See #3147 for a discussion of the loop implementation.  Highlights:
+        # * Refresh the list of Keep services after each failure, in case
+        #   it's being updated.
+        # * Retry until we succeed, we're out of retries, or every available
+        #   service has returned permanent failure.
+        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                      for hint in locator.hints if hint.startswith('K@')]
+        # Map root URLs their KeepService objects.
+        roots_map = {root: self.KeepService(root) for root in hint_roots}
+        blob = None
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
+        for tries_left in loop:
+            try:
+                local_roots = self.map_new_services(
+                    roots_map, expect_hash,
+                    force_rebuild=(tries_left < num_retries))
+            except Exception as error:
+                loop.save_result(error)
+                continue
+
+            # Query KeepService objects that haven't returned
+            # permanent failure, in our specified shuffle order.
+            services_to_try = [roots_map[root]
+                               for root in (local_roots + hint_roots)
+                               if roots_map[root].usable()]
+            http = httplib2.Http(timeout=self.timeout)
+            for keep_service in services_to_try:
+                blob = keep_service.get(http, locator)
+                if blob is not None:
+                    break
+            loop.save_result((blob, len(services_to_try)))
+
+        # Always cache the result, then return it if we succeeded.
+        slot.set(blob)
         self.cap_cache()
-        raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+        if loop.success():
+            return blob
+
+        # No servers fulfilled the request.  Count how many responded
+        # "not found;" if the ratio is high enough (currently 75%), report
+        # Not Found; otherwise a generic error.
+        # Q: Including 403 is necessary for the Keep tests to continue
+        # passing, but maybe they should expect KeepReadError instead?
+        not_founds = sum(1 for ks in roots_map.values()
+                         if ks.last_status() in set([403, 404, 410]))
+        if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
+            raise arvados.errors.NotFoundError(loc_s)
+        else:
+            raise arvados.errors.KeepReadError(loc_s)
 
-    def get_url(self, url, headers, expect_hash):
-        h = httplib2.Http()
-        try:
-            _logger.debug("Request: GET %s", url)
-            with timer.Timer() as t:
-                resp, content = h.request(url.encode('utf-8'), 'GET',
-                                          headers=headers)
-            _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
-                         len(content), t.msecs,
-                         (len(content)/(1024*1024))/t.secs)
-            if re.match(r'^2\d\d$', resp['status']):
-                md5 = hashlib.md5(content).hexdigest()
-                if md5 == expect_hash:
-                    return content
-                _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
-        except Exception as e:
-            _logger.debug("Request fail: GET %s => %s: %s",
-                         url, type(e), str(e))
-        return None
-
-    def put(self, data, copies=2):
+    def put(self, data, copies=2, num_retries=0):
+        """Save data in Keep.
+
+        This method will get a list of Keep services from the API server, and
+        send the data to each one simultaneously in a new thread.  Once the
+        uploads are finished, if enough copies are saved, this method returns
+        the most recent HTTP response body.  If requests fail to upload
+        enough copies, this method raises KeepWriteError.
+
+        Arguments:
+        * data: The string of data to upload.
+        * copies: The number of copies that the user requires be saved.
+          Default 2.
+        * num_retries: The number of times to retry PUT requests to
+          *each* Keep server if it returns temporary failures, with
+          exponential backoff.  Default 0.
+        """
         data_hash = hashlib.md5(data).hexdigest()
-        have_copies = 0
-        want_copies = copies
-        if not (want_copies > 0):
+        if copies < 1:
             return data_hash
-        threads = []
-        thread_limiter = KeepClient.ThreadLimiter(want_copies)
-        for service_root in self.shuffled_service_roots(data_hash):
-            t = KeepClient.KeepWriterThread(
-                self.api_token,
-                data=data,
-                data_hash=data_hash,
-                service_root=service_root,
-                thread_limiter=thread_limiter,
-                timeout=self.timeout,
-                using_proxy=self.using_proxy,
-                want_copies=(want_copies if self.using_proxy else 1))
-            t.start()
-            threads += [t]
-        for t in threads:
-            t.join()
-        if thread_limiter.done() < want_copies:
-            # Retry the threads (i.e., services) that failed the first
-            # time around.
-            threads_retry = []
+
+        headers = {}
+        if self.using_proxy:
+            # Tell the proxy how many copies we want it to store
+            headers['X-Keep-Desired-Replication'] = str(copies)
+        roots_map = {}
+        thread_limiter = KeepClient.ThreadLimiter(copies)
+        loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                               backoff_start=2)
+        for tries_left in loop:
+            try:
+                local_roots = self.map_new_services(
+                    roots_map, data_hash,
+                    force_rebuild=(tries_left < num_retries), **headers)
+            except Exception as error:
+                loop.save_result(error)
+                continue
+
+            threads = []
+            for service_root, ks in roots_map.iteritems():
+                if ks.finished():
+                    continue
+                t = KeepClient.KeepWriterThread(
+                    ks,
+                    data=data,
+                    data_hash=data_hash,
+                    service_root=service_root,
+                    thread_limiter=thread_limiter,
+                    timeout=self.timeout)
+                t.start()
+                threads.append(t)
             for t in threads:
-                if not t.success():
-                    _logger.debug("Retrying: PUT %s %s",
-                                    t.args['service_root'],
-                                    t.args['data_hash'])
-                    retry_with_args = t.args.copy()
-                    t_retry = KeepClient.KeepWriterThread(self.api_token,
-                                                          **retry_with_args)
-                    t_retry.start()
-                    threads_retry += [t_retry]
-            for t in threads_retry:
                 t.join()
-        have_copies = thread_limiter.done()
-        # If we're done, return the response from Keep
-        if have_copies >= want_copies:
+            loop.save_result((thread_limiter.done() >= copies, len(threads)))
+
+        if loop.success():
             return thread_limiter.response()
         raise arvados.errors.KeepWriteError(
             "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, want_copies, have_copies))
-
+            (data_hash, copies, thread_limiter.done()))
 
     def local_store_put(self, data):
         md5 = hashlib.md5(data).hexdigest()
index 6198919e8bbbd1e3cc807d0748cb76364a11fbd0..4ac9df17ecf838b4b65685636af0f8ab5b59c3b1 100644 (file)
@@ -1,12 +1,11 @@
-# usage example:
-#
-# ARVADOS_API_TOKEN=abc ARVADOS_API_HOST=arvados.local python -m unittest discover
-
+import mock
 import os
 import unittest
 
 import arvados
+import arvados.retry
 import run_test_server
+from arvados_testutil import fake_httplib2_response
 
 class KeepTestCase(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
@@ -219,3 +218,108 @@ class KeepProxyTestCase(run_test_server.TestCaseWithServers):
                          'baz2',
                          'wrong content from Keep.get(md5("baz2"))')
         self.assertTrue(keep_client.using_proxy)
+
+
+class KeepClientRetryTestMixin(object):
+    # Testing with a local Keep store won't exercise the retry behavior.
+    # Instead, our strategy is:
+    # * Create a client with one proxy specified (pointed at a black
+    #   hole), so there's no need to instantiate an API client, and
+    #   all HTTP requests come from one place.
+    # * Mock httplib's request method to provide simulated responses.
+    # This lets us test the retry logic extensively without relying on any
+    # supporting servers, and prevents side effects in case something hiccups.
+    # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
+    # run_method().
+    PROXY_ADDR = 'http://[100::]/'
+    TEST_DATA = 'testdata'
+    TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
+
+    @staticmethod
+    def mock_responses(body, *codes):
+        return mock.patch('httplib2.Http.request', side_effect=(
+                (fake_httplib2_response(code), body) for code in codes))
+
+    def new_client(self):
+        return arvados.KeepClient(proxy=self.PROXY_ADDR, local_store='')
+
+    def run_method(self, *args, **kwargs):
+        raise NotImplementedError("test subclasses must define run_method")
+
+    def check_success(self, expected=None, *args, **kwargs):
+        if expected is None:
+            expected = self.DEFAULT_EXPECT
+        self.assertEqual(expected, self.run_method(*args, **kwargs))
+
+    def check_exception(self, error_class=None, *args, **kwargs):
+        if error_class is None:
+            error_class = self.DEFAULT_EXCEPTION
+        self.assertRaises(error_class, self.run_method, *args, **kwargs)
+
+    def test_immediate_success(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+            self.check_success()
+
+    def test_retry_then_success(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+            self.check_success(num_retries=3)
+
+    def test_no_default_retry(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 200):
+            self.check_exception()
+
+    def test_no_retry_after_permanent_error(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 403, 200):
+            self.check_exception(num_retries=3)
+
+    def test_error_after_retries_exhausted(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 500, 500, 200):
+            self.check_exception(num_retries=1)
+
+
+# Don't delay from HTTPRetryLoop's exponential backoff.
+no_backoff = mock.patch('time.sleep', lambda n: None)
+@no_backoff
+class KeepClientRetryGetTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
+    DEFAULT_EXCEPTION = arvados.errors.KeepReadError
+    HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
+
+    def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
+                   *args, **kwargs):
+        return self.new_client().get(locator, *args, **kwargs)
+
+    def test_specific_exception_when_not_found(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200):
+            self.check_exception(arvados.errors.NotFoundError, num_retries=3)
+
+    def test_general_exception_with_mixed_errors(self):
+        # get should raise a NotFoundError if no server returns the block,
+        # and a high threshold of servers report that it's not found.
+        # This test rigs up 50/50 disagreement between two servers, and
+        # checks that it does not become a NotFoundError.
+        client = self.new_client()
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 500):
+            with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
+                client.get(self.HINTED_LOCATOR)
+            self.assertNotIsInstance(
+                exc_check.exception, arvados.errors.NotFoundError,
+                "mixed errors raised NotFoundError")
+
+    def test_hint_server_can_succeed_without_retries(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 404, 200, 500):
+            self.check_success(locator=self.HINTED_LOCATOR)
+
+
+@no_backoff
+class KeepClientRetryPutTestCase(unittest.TestCase, KeepClientRetryTestMixin):
+    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
+    DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
+
+    def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
+                   copies=1, *args, **kwargs):
+        return self.new_client().put(data, copies, *args, **kwargs)
+
+    def test_do_not_send_multiple_copies_to_same_server(self):
+        with self.mock_responses(self.DEFAULT_EXPECT, 200):
+            self.check_exception(copies=2, num_retries=3)