class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
- def __init__(self, api_client=None, num_retries=0):
+ def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
CollectionWriter lets you build a new Arvados Collection from scratch.
service requests. Default 0. You may change this value
after instantiation, but note those changes may not
propagate to related objects like the Keep client.
+ * replication: The number of copies of each block to store.
+ If this argument is None or not supplied, replication is
+ the server-provided default if available, otherwise 2.
"""
self._api_client = api_client
self.num_retries = num_retries
+ self.replication = (2 if replication is None else replication)
self._keep_client = None
self._data_buffer = []
self._data_buffer_len = 0
data_buffer = ''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
- self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+ self._my_keep().put(
+ data_buffer[0:self.KEEP_BLOCK_SIZE],
+ copies=self.replication))
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
self._current_file_name = None
def finish(self):
- # Store the manifest in Keep and return its locator.
- return self._my_keep().put(self.manifest_text())
+ """Store the manifest in Keep and return its locator.
+
+ This is useful for storing manifest fragments (task outputs)
+ temporarily in Keep during a Crunch job.
+
+ In other cases you should make a collection instead, by
+ sending manifest_text() to the API server's "create
+ collection" endpoint.
+ """
+ return self._my_keep().put(self.manifest_text(), copies=self.replication)
def portable_data_hash(self):
stripped = self.stripped_manifest()
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
- def __init__(self, api_client=None, num_retries=0):
+ def __init__(self, api_client=None, **kwargs):
self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(
- api_client, num_retries=num_retries)
+ super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):