17465: Adds storage classes tracking and old cluster support.
authorLucas Di Pentima <lucas.dipentima@curii.com>
Thu, 27 May 2021 21:38:30 +0000 (18:38 -0300)
committerLucas Di Pentima <lucas.dipentima@curii.com>
Thu, 27 May 2021 21:38:30 +0000 (18:38 -0300)
* Tracks which classes were confirmed and ask for the remaining on following
  retries.
* When receiving a keepstore response without the proper header, it fallbacks
  to only saving the requested number of copies.
* Updates tests to reflect code changes.

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima@curii.com>

sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py

index eac25e9d3f65dc57f1e9e9d1fbd3734152800814..b11578f4cc02e41573c64764eb828856db6bd1e5 100644 (file)
@@ -4,6 +4,7 @@
 
 from __future__ import absolute_import
 from __future__ import division
+import copy
 from future import standard_library
 from future.utils import native_str
 standard_library.install_aliases()
@@ -446,7 +447,9 @@ class KeepClient(object):
                 return None
             return self._result['body']
 
-        def put(self, hash_s, body, timeout=None):
+        def put(self, hash_s, body, timeout=None, headers={}):
+            put_headers = copy.copy(self.put_headers)
+            put_headers.update(headers)
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
@@ -470,7 +473,7 @@ class KeepClient(object):
                     curl.setopt(pycurl.INFILESIZE, len(body))
                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
                     curl.setopt(pycurl.HTTPHEADER, [
-                        '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
+                        '{}: {}'.format(k,v) for k,v in put_headers.items()])
                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     if self.insecure:
@@ -566,18 +569,22 @@ class KeepClient(object):
             self.successful_copies = 0
             self.confirmed_storage_classes = {}
             self.response = None
+            self.storage_classes_tracking = True
             self.queue_data_lock = threading.Lock()
-            self.pending_tries = copies
+            self.pending_tries = max(copies, len(classes))
             self.pending_tries_notification = threading.Condition()
 
         def write_success(self, response, replicas_nr, classes_confirmed):
             with self.queue_data_lock:
                 self.successful_copies += replicas_nr
-                for st_class, st_copies in classes_confirmed.items():
-                    try:
-                        self.confirmed_storage_classes[st_class] += st_copies
-                    except KeyError:
-                        self.confirmed_storage_classes[st_class] = st_copies
+                if classes_confirmed is None:
+                    self.storage_classes_tracking = False
+                elif self.storage_classes_tracking:
+                    for st_class, st_copies in classes_confirmed.items():
+                        try:
+                            self.confirmed_storage_classes[st_class] += st_copies
+                        except KeyError:
+                            self.confirmed_storage_classes[st_class] = st_copies
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
@@ -591,12 +598,22 @@ class KeepClient(object):
             with self.queue_data_lock:
                 return self.wanted_copies - self.successful_copies
 
+        def satisfied_classes(self):
+            with self.queue_data_lock:
+                if not self.storage_classes_tracking:
+                    # Notifies disabled storage classes expectation to
+                    # the outer loop.
+                    return None
+            return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
+
         def pending_classes(self):
             with self.queue_data_lock:
-                unsatisfied_classes = []
+                if self.wanted_storage_classes is None:
+                    return []
+                unsatisfied_classes = copy.copy(self.wanted_storage_classes)
                 for st_class, st_copies in self.confirmed_storage_classes.items():
-                    if st_class in self.wanted_storage_classes and st_copies < self.wanted_copies:
-                        unsatisfied_classes.append(st_class)
+                    if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
+                        unsatisfied_classes.remove(st_class)
                 return unsatisfied_classes
 
         def get_next_task(self):
@@ -614,7 +631,7 @@ class KeepClient(object):
                         while True:
                             self.get_nowait()
                             self.task_done()
-                    elif self.pending_tries > 0 or len(self.pending_classes()) > 0:
+                    elif self.pending_tries > 0:
                         service, service_root = self.get_nowait()
                         if service.finished():
                             self.task_done()
@@ -648,7 +665,7 @@ class KeepClient(object):
             self.total_task_nr += 1
 
         def done(self):
-            return self.queue.successful_copies
+            return self.queue.successful_copies, self.queue.satisfied_classes()
 
         def join(self):
             # Start workers
@@ -690,9 +707,14 @@ class KeepClient(object):
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
+            classes = self.queue.pending_classes()
+            headers = {}
+            if len(classes) > 0:
+                headers['X-Keep-Storage-Classes'] = ', '.join(classes)
             success = bool(service.put(self.data_hash,
                                         self.data,
-                                        timeout=self.timeout))
+                                        timeout=self.timeout,
+                                        headers=headers))
             result = service.last_result()
 
             if not success:
@@ -722,7 +744,7 @@ class KeepClient(object):
                         classes_confirmed[stored_class] = int(stored_copies)
             except (KeyError, ValueError):
                 # Storage classes confirmed header missing or corrupt
-                classes_confirmed = {}
+                classes_confirmed = None
 
             return result['body'].strip(), replicas_stored, classes_confirmed
 
@@ -1187,12 +1209,11 @@ class KeepClient(object):
                              arvados.util.new_request_id()),
             'X-Keep-Desired-Replicas': str(copies),
         }
-        if len(classes) > 0:
-            headers['X-Keep-Storage-Classes'] = ', '.join(classes)
         roots_map = {}
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
-        done = 0
+        done_copies = 0
+        done_classes = []
         for tries_left in loop:
             try:
                 sorted_roots = self.map_new_services(
@@ -1204,20 +1225,37 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
+            pending_classes = []
+            if done_classes is not None:
+                pending_classes = list(set(classes) - set(done_classes))
             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
-                                                        copies=copies - done,
+                                                        copies=copies - done_copies,
                                                         max_service_replicas=self.max_replicas_per_service,
                                                         timeout=self.current_timeout(num_retries - tries_left),
-                                                        classes=classes)
+                                                        classes=pending_classes)
             for service_root, ks in [(root, roots_map[root])
                                      for root in sorted_roots]:
                 if ks.finished():
                     continue
                 writer_pool.add_task(ks, service_root)
             writer_pool.join()
-            done += writer_pool.done()
-            loop.save_result((done >= copies, writer_pool.total_task_nr))
+            pool_copies, pool_classes = writer_pool.done()
+            done_copies += pool_copies
+            if (done_classes is not None) and (pool_classes is not None):
+                done_classes += pool_classes
+                loop.save_result(
+                    (done_copies >= copies and set(done_classes) == set(classes),
+                    writer_pool.total_task_nr))
+            else:
+                # Old keepstore contacted without storage classes support:
+                # success is determined only by successful copies.
+                #
+                # Disable storage classes tracking from this point forward.
+                _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
+                done_classes = None
+                loop.save_result(
+                    (done_copies >= copies, writer_pool.total_task_nr))
 
         if loop.success():
             return writer_pool.response()
@@ -1231,7 +1269,7 @@ class KeepClient(object):
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} after {} (wanted {} copies but wrote {})".format(
-                    data_hash, loop.attempts_str(), copies, writer_pool.done()), service_errors, label="service")
+                    data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
 
     def local_store_put(self, data, copies=1, num_retries=None):
         """A stub for put().
index 27e3cf6330728b8fa523d51d2197885cc1229070..cae2308f6fb94e57e3cf088f806800239c5a7eb7 100644 (file)
@@ -70,7 +70,7 @@ class KeepTestCase(run_test_server.TestCaseWithServers):
 
     def test_KeepLongBinaryRWTest(self):
         blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
-        for i in range(0,23):
+        for i in range(0, 23):
             blob_data = blob_data + blob_data
         blob_locator = self.keep_client.put(blob_data)
         self.assertRegex(
@@ -1178,9 +1178,10 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             self._result = {}
             self._result['headers'] = {}
             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
+            self._result['headers']['x-keep-storage-classes-confirmed'] = 'default={}'.format(replicas)
             self._result['body'] = 'foobar'
 
-        def put(self, data_hash, data, timeout):
+        def put(self, data_hash, data, timeout, headers):
             time.sleep(self.delay)
             if self.will_raise is not None:
                 raise self.will_raise
@@ -1207,7 +1208,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies)
+        self.assertEqual(self.pool.done(), (self.copies, []))
 
     def test_only_write_enough_on_partial_success(self):
         for i in range(5):
@@ -1216,7 +1217,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies)
+        self.assertEqual(self.pool.done(), (self.copies, []))
 
     def test_only_write_enough_when_some_crash(self):
         for i in range(5):
@@ -1225,7 +1226,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies)
+        self.assertEqual(self.pool.done(), (self.copies, []))
 
     def test_fail_when_too_many_crash(self):
         for i in range(self.copies+1):
@@ -1235,7 +1236,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
             self.pool.add_task(ks, None)
         self.pool.join()
-        self.assertEqual(self.pool.done(), self.copies-1)
+        self.assertEqual(self.pool.done(), (self.copies-1, []))
 
 
 @tutil.skip_sleep