Merge branch '15680-arv-put-retry'
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 21 Oct 2019 15:42:39 +0000 (11:42 -0400)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 7 Nov 2019 18:35:13 +0000 (15:35 -0300)
fixes #15680

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/keep.py
sdk/python/arvados/retry.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_keep_client.py

index 37666eb8e8b8f7e2d8f4cbbdf76ff7bda56b003b..6893b94bf78b7b16a1da1a802fdabe419563eb0d 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -500,6 +500,7 @@ class _BlockManager(object):
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
+        self.num_retries = num_retries
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -554,9 +555,9 @@ class _BlockManager(object):
                     return
 
                 if self.copies is None:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries)
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
@@ -725,9 +726,9 @@ class _BlockManager(object):
         if sync:
             try:
                 if self.copies is None:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries)
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
index cf1a36f9fdfbbfdf739fe75027d00eaa782df4f2..26902931582244142054d69bc2e52fe596927de3 100644 (file)
@@ -1410,7 +1410,7 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries)
         return self._block_manager
 
     def _remember_api_response(self, response):
index afe75b31056c5c7d872256875a74265012de3b91..5436702b96ecc05571244128f28ebb6c3f2a17d6 100644 (file)
@@ -823,7 +823,9 @@ class ArvPutUploadJob(object):
                                           update_collection):
             try:
                 self._remote_collection = arvados.collection.Collection(
-                    update_collection, api_client=self._api_client)
+                    update_collection,
+                    api_client=self._api_client,
+                    num_retries=self.num_retries)
             except arvados.errors.ApiError as error:
                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
             else:
@@ -866,7 +868,8 @@ class ArvPutUploadJob(object):
                 self._state['manifest'],
                 replication_desired=self.replication_desired,
                 put_threads=self.put_threads,
-                api_client=self._api_client)
+                api_client=self._api_client,
+                num_retries=self.num_retries)
 
     def _cached_manifest_valid(self):
         """
index 4354ced67d3b4a3b678be3cacaa575f58f4f6d3f..86a28f54c402c8d44aba1d8511faab18e5e8b44a 100644 (file)
@@ -1117,7 +1117,7 @@ class KeepClient(object):
                 "{} not found".format(loc_s), service_errors)
         else:
             raise arvados.errors.KeepReadError(
-                "failed to read {}".format(loc_s), service_errors, label="service")
+                "failed to read {} after {}".format(loc_s, loop.attempts_str()), service_errors, label="service")
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None, request_id=None):
@@ -1196,8 +1196,8 @@ class KeepClient(object):
                               for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
-                "failed to write {} (wanted {} copies but wrote {})".format(
-                    data_hash, copies, writer_pool.done()), service_errors, label="service")
+                "failed to write {} after {} (wanted {} copies but wrote {})".format(
+                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
index 3f62ab779f81fa43537a8223b7348bed52dc3a7c..ea4095930fc78f7cbbb26c49f45a8fa66fbb4081 100644 (file)
@@ -64,6 +64,7 @@ class RetryLoop(object):
         self.max_wait = max_wait
         self.next_start_time = 0
         self.results = deque(maxlen=save_results)
+        self._attempts = 0
         self._running = None
         self._success = None
 
@@ -101,6 +102,7 @@ class RetryLoop(object):
                 "recorded a loop result after the loop finished")
         self.results.append(result)
         self._success = self.check_result(result)
+        self._attempts += 1
 
     def success(self):
         """Return the loop's end state.
@@ -118,6 +120,19 @@ class RetryLoop(object):
             raise arvados.errors.AssertionError(
                 "queried loop results before any were recorded")
 
+    def attempts(self):
+        """Return the number of attempts that have been made.
+
+        Includes successes and failures."""
+        return self._attempts
+
+    def attempts_str(self):
+        """Human-readable attempts(): 'N attempts' or '1 attempt'"""
+        if self._attempts == 1:
+            return '1 attempt'
+        else:
+            return '{} attempts'.format(self._attempts)
+
 
 def check_http_response_success(status_code):
     """Convert an HTTP status code to a loop control flag.
index a760255dd6da8e01470dacafa6bcfafeddc50058..086fa542a2b28b2112f92eb49e21934247cf2710 100644 (file)
@@ -865,7 +865,7 @@ class BlockManagerTest(unittest.TestCase):
     def test_bufferblock_commit_pending(self):
         # Test for bug #7225
         mockkeep = mock.MagicMock()
-        mockkeep.put.side_effect = lambda x: time.sleep(1)
+        mockkeep.put.side_effect = lambda *args, **kwargs: time.sleep(1)
         with arvados.arvfile._BlockManager(mockkeep) as blockmanager:
             bufferblock = blockmanager.alloc_bufferblock()
             bufferblock.append("foo")
index d6b3a2a12dc6f4d4d8bb997c25f43cc21e8cda62..f6bcdc7f97c169f63e4565d3fda0f6c30ada7b54 100644 (file)
@@ -489,15 +489,16 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def check_errors_from_last_retry(self, verb, exc_class):
         api_client = self.mock_keep_services(count=2)
         req_mock = tutil.mock_keep_responses(
-            "retry error reporting test", 500, 500, 403, 403)
+            "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502)
         with req_mock, tutil.skip_sleep, \
                 self.assertRaises(exc_class) as err_check:
             keep_client = arvados.KeepClient(api_client=api_client)
             getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
                                        num_retries=3)
-        self.assertEqual([403, 403], [
+        self.assertEqual([502, 502], [
                 getattr(error, 'status_code', None)
                 for error in err_check.exception.request_errors().values()])
+        self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts')
 
     def test_get_error_reflects_last_retry(self):
         self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
@@ -1096,7 +1097,9 @@ class KeepClientRetryTestMixin(object):
     def check_exception(self, error_class=None, *args, **kwargs):
         if error_class is None:
             error_class = self.DEFAULT_EXCEPTION
-        self.assertRaises(error_class, self.run_method, *args, **kwargs)
+        with self.assertRaises(error_class) as err:
+            self.run_method(*args, **kwargs)
+        return err
 
     def test_immediate_success(self):
         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
@@ -1120,7 +1123,8 @@ class KeepClientRetryTestMixin(object):
 
     def test_error_after_retries_exhausted(self):
         with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
-            self.check_exception(num_retries=1)
+            err = self.check_exception(num_retries=1)
+        self.assertRegex(str(err.exception), r'failed to .* after 2 attempts')
 
     def test_num_retries_instance_fallback(self):
         self.client_kwargs['num_retries'] = 3