DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes=[]):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
- self.storage_classes = storage_classes
+ self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
+ # generating data more quickly than it can be sent to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
storage class.
"""
+
+ if storage_classes_desired and type(storage_classes_desired) is not list:
+ raise errors.ArgumentError("storage_classes_desired must be list type.")
+
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
self._block_manager = block_manager
self.replication_desired = replication_desired
- self.storage_classes_desired = storage_classes_desired
+ self._storage_classes_desired = storage_classes_desired
self.put_threads = put_threads
if apiconfig:
except (IOError, errors.SyntaxError) as e:
raise errors.ArgumentError("Error processing manifest text: %s", e)
+ def storage_classes_desired(self):
+ return self._storage_classes_desired or []
+
def root_collection(self):
return self
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
classes = self.storage_classes_desired or []
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes=classes)
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
return self._block_manager
def _remember_api_response(self, response):
# replication_desired and storage_classes_desired from the API server
if self.replication_desired is None:
self.replication_desired = self._api_response.get('replication_desired', None)
- if self.storage_classes_desired is None:
- self.storage_classes_desired = self._api_response.get('storage_classes_desired', None)
+ if self._storage_classes_desired is None:
+ self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
def _populate(self):
if self._manifest_text is None:
body={}
if properties:
body["properties"] = properties
- desired_classes = storage_classes
- # Instance level storage_classes takes precedence over argument.
- if self.storage_classes_desired:
- if desired_classes and self.storage_classes_desired != desired_classes:
- _logger.warning("Storage classes already set to {}".format(self.storage_classes_desired))
- desired_classes = self.storage_classes_desired
- if desired_classes:
- body["storage_classes_desired"] = desired_classes
+ if storage_classes:
+ self._storage_classes_desired = storage_classes
+ body["storage_classes_desired"] = self.storage_classes_desired()
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
self._copy_remote_blocks(remote_blocks={})
self._has_remote_blocks = False
+ if storage_classes:
+ self._storage_classes_desired = storage_classes
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
body["owner_uuid"] = owner_uuid
if properties:
body["properties"] = properties
- desired_classes = storage_classes
- # Instance level storage_classes takes precedence over argument.
- if self.storage_classes_desired:
- if desired_classes and self.storage_classes_desired != desired_classes:
- _logger.warning("Storage classes already set to {}".format(self.storage_classes_desired))
- desired_classes = self.storage_classes_desired
- if desired_classes:
- body["storage_classes_desired"] = desired_classes
+ if self.storage_classes_desired():
+ body["storage_classes_desired"] = self.storage_classes_desired()
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
loc = c1.manifest_locator()
c2 = Collection(loc)
self.assertEqual(c1.manifest_text, c2.manifest_text)
- self.assertEqual(c1.storage_classes_desired, c2.storage_classes_desired)
+ self.assertEqual(c1.storage_classes_desired(), c2.storage_classes_desired())
+
+ def test_storage_classes_change_after_save(self):
+ m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+ c1 = Collection(m, storage_classes_desired=['archival'])
+ c1.save_new()
+ loc = c1.manifest_locator()
+ c2 = Collection(loc)
+ self.assertEqual(['archival'], c2.storage_classes_desired())
+ c2.save(storage_classes=['highIO'])
+ self.assertEqual(['highIO'], c2.storage_classes_desired())
+ c3 = Collection(loc)
+ self.assertEqual(c1.manifest_text, c3.manifest_text)
+ self.assertEqual(['highIO'], c3.storage_classes_desired())
def test_storage_classes_desired_not_loaded_if_provided(self):
m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
loc = c1.manifest_locator()
c2 = Collection(loc, storage_classes_desired=['default'])
self.assertEqual(c1.manifest_text, c2.manifest_text)
- self.assertNotEqual(c1.storage_classes_desired, c2.storage_classes_desired)
+ self.assertNotEqual(c1.storage_classes_desired(), c2.storage_classes_desired())
def test_init_manifest(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt