17465: Synchronizes storage classes data between Collection & BlockManager.
authorLucas Di Pentima <lucas.dipentima@curii.com>
Wed, 2 Jun 2021 19:54:39 +0000 (16:54 -0300)
committerLucas Di Pentima <lucas.dipentima@curii.com>
Wed, 2 Jun 2021 19:54:39 +0000 (16:54 -0300)
Storage classes can be set at Collection instantiation time, and BlockManager
used to get that data when being instantiated by Collection, but desired
storage classes can change in the middle of a Collection instance lifetime,
and new blocks should get written on the correct classes, so the solution is
to pass a function for BlockManager to be able to query its parent Collection
instance settings at any time.

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima@curii.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/tests/test_collections.py

index 7c6b732d36fcefc5a88896382ed99b61b83f9d4d..e915ff2ac0a37c86c635f9ce68f52d227fed1725 100644 (file)
@@ -481,7 +481,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes=[]):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -491,13 +491,10 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        if put_threads:
-            self.num_put_threads = put_threads
-        else:
-            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
-        self.storage_classes = storage_classes
+        self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
@@ -556,9 +553,9 @@ class _BlockManager(object):
                     return
 
                 if self.copies is None:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
@@ -573,7 +570,7 @@ class _BlockManager(object):
 
                 # If we don't limit the Queue size, the upload queue can quickly
                 # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
+                # generating data more quickly than it can be sent to the Keep
                 # servers.
                 #
                 # With two upload threads and a queue size of 2, this means up to 4
@@ -727,9 +724,9 @@ class _BlockManager(object):
         if sync:
             try:
                 if self.copies is None:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
index 319046dc01aebbcf4d0496531be5e86c25c8f783..3910b224305a87e3fa292d21c97827aa36b2049f 100644 (file)
@@ -1300,12 +1300,16 @@ class Collection(RichCollectionBase):
           storage class.
 
         """
+
+        if storage_classes_desired and type(storage_classes_desired) is not list:
+            raise errors.ArgumentError("storage_classes_desired must be list type.")
+
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         self.replication_desired = replication_desired
-        self.storage_classes_desired = storage_classes_desired
+        self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
 
         if apiconfig:
@@ -1343,6 +1347,9 @@ class Collection(RichCollectionBase):
             except (IOError, errors.SyntaxError) as e:
                 raise errors.ArgumentError("Error processing manifest text: %s", e)
 
+    def storage_classes_desired(self):
+        return self._storage_classes_desired or []
+
     def root_collection(self):
         return self
 
@@ -1418,7 +1425,7 @@ class Collection(RichCollectionBase):
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
             classes = self.storage_classes_desired or []
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes=classes)
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1442,8 +1449,8 @@ class Collection(RichCollectionBase):
         # replication_desired and storage_classes_desired from the API server
         if self.replication_desired is None:
             self.replication_desired = self._api_response.get('replication_desired', None)
-        if self.storage_classes_desired is None:
-            self.storage_classes_desired = self._api_response.get('storage_classes_desired', None)
+        if self._storage_classes_desired is None:
+            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
 
     def _populate(self):
         if self._manifest_text is None:
@@ -1583,14 +1590,9 @@ class Collection(RichCollectionBase):
         body={}
         if properties:
             body["properties"] = properties
-        desired_classes = storage_classes
-        # Instance level storage_classes takes precedence over argument.
-        if self.storage_classes_desired:
-            if desired_classes and self.storage_classes_desired != desired_classes:
-                _logger.warning("Storage classes already set to {}".format(self.storage_classes_desired))
-            desired_classes = self.storage_classes_desired
-        if desired_classes:
-            body["storage_classes_desired"] = desired_classes
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
+            body["storage_classes_desired"] = self.storage_classes_desired()
         if trash_at:
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
@@ -1693,6 +1695,9 @@ class Collection(RichCollectionBase):
             self._copy_remote_blocks(remote_blocks={})
             self._has_remote_blocks = False
 
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
@@ -1708,14 +1713,8 @@ class Collection(RichCollectionBase):
                 body["owner_uuid"] = owner_uuid
             if properties:
                 body["properties"] = properties
-            desired_classes = storage_classes
-            # Instance level storage_classes takes precedence over argument.
-            if self.storage_classes_desired:
-                if desired_classes and self.storage_classes_desired != desired_classes:
-                    _logger.warning("Storage classes already set to {}".format(self.storage_classes_desired))
-                desired_classes = self.storage_classes_desired
-            if desired_classes:
-                body["storage_classes_desired"] = desired_classes
+            if self.storage_classes_desired():
+                body["storage_classes_desired"] = self.storage_classes_desired()
             if trash_at:
                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
                 body["trash_at"] = t
index eaf68ec1f5b5afa9ab2e371d9058ae48a763bc20..f821ff952f7a45f913538c890ffc40d397b04ada 100644 (file)
@@ -915,7 +915,20 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         loc = c1.manifest_locator()
         c2 = Collection(loc)
         self.assertEqual(c1.manifest_text, c2.manifest_text)
-        self.assertEqual(c1.storage_classes_desired, c2.storage_classes_desired)
+        self.assertEqual(c1.storage_classes_desired(), c2.storage_classes_desired())
+
+    def test_storage_classes_change_after_save(self):
+        m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+        c1 = Collection(m, storage_classes_desired=['archival'])
+        c1.save_new()
+        loc = c1.manifest_locator()
+        c2 = Collection(loc)
+        self.assertEqual(['archival'], c2.storage_classes_desired())
+        c2.save(storage_classes=['highIO'])
+        self.assertEqual(['highIO'], c2.storage_classes_desired())
+        c3 = Collection(loc)
+        self.assertEqual(c1.manifest_text, c3.manifest_text)
+        self.assertEqual(['highIO'], c3.storage_classes_desired())
 
     def test_storage_classes_desired_not_loaded_if_provided(self):
         m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
@@ -924,7 +937,7 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         loc = c1.manifest_locator()
         c2 = Collection(loc, storage_classes_desired=['default'])
         self.assertEqual(c1.manifest_text, c2.manifest_text)
-        self.assertNotEqual(c1.storage_classes_desired, c2.storage_classes_desired)
+        self.assertNotEqual(c1.storage_classes_desired(), c2.storage_classes_desired())
 
     def test_init_manifest(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt