class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
- def __init__(self, api_client=None, num_retries=0):
+ def __init__(self, api_client=None, num_retries=0, replication=0):
"""Instantiate a CollectionWriter.
CollectionWriter lets you build a new Arvados Collection from scratch.
service requests. Default 0. You may change this value
after instantiation, but note those changes may not
propagate to related objects like the Keep client.
+ * replication: The number of copies of each block to store.
+ If this argument is 0 or not supplied, replication is
+ the server-provided default if available, otherwise 2.
"""
self._api_client = api_client
self.num_retries = num_retries
+ self.replication = (replication if replication>0 else 2)
self._keep_client = None
self._data_buffer = []
self._data_buffer_len = 0
data_buffer = ''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
- self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+ self._my_keep().put(
+ data_buffer[0:self.KEEP_BLOCK_SIZE],
+ copies=self.replication))
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
self._current_file_name = None
def finish(self):
- # Store the manifest in Keep and return its locator.
- return self._my_keep().put(self.manifest_text())
+ # Store the manifest in Keep and return its locator. Beware,
+ # this is only useful in special cases like storing manifest
+ # fragments temporarily in Keep during a Crunch job. In most
+ # cases you should make a collection instead, by sending
+ # manifest_text() to the API server's "create collection"
+ # endpoint.
+ return self._my_keep().put(self.manifest_text(), copies=self.replication)
def portable_data_hash(self):
stripped = self.stripped_manifest()
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
- def __init__(self, api_client=None, num_retries=0):
+ def __init__(self, api_client=None, **kwargs):
self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(
- api_client, num_retries=num_retries)
+ super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):
# Local storage methods need no-op num_retries arguments to keep
# integration tests happy. With better isolation they could
# probably be removed again.
- def local_store_put(self, data, num_retries=0):
+ def local_store_put(self, data, num_retries=0, copies=1):
md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
os.path.join(self.local_store, md5))
return locator
- def local_store_get(self, loc_s, num_retries=0):
+ def local_store_get(self, loc_s, num_retries=0, copies=1):
try:
locator = KeepLocator(loc_s)
except ValueError:
self.mock_keep_services(client, code, self.PROXY_RESPONSE)
return client
+ def disk_services_available(self, number_of_disks):
+ return {
+ 'items_available': number_of_disks,
+ 'items': [{
+ 'uuid': 'zzzzz-bi6l4-mockdisk%07d' % i,
+ 'owner_uuid': 'zzzzz-tpzed-000000000000000',
+ 'service_host': tutil.TEST_HOST,
+ 'service_port': 65535-i,
+ 'service_ssl_flag': True,
+ 'service_type': 'disk',
+ } for i in range(0, number_of_disks)]
+ }
+
@tutil.skip_sleep
class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
return tutil.mock_put_responses(body, *codes, **headers)
def foo_writer(self, **kwargs):
- api_client = self.api_client_mock()
- writer = arvados.CollectionWriter(api_client, **kwargs)
+ kwargs.setdefault('api_client', self.api_client_mock())
+ writer = arvados.CollectionWriter(**kwargs)
writer.start_new_file('foo')
writer.write('foo')
return writer
with self.assertRaises(arvados.errors.KeepWriteError):
writer.finish()
+ def test_write_insufficient_replicas_via_proxy(self):
+ writer = self.foo_writer(replication=3)
+ with self.mock_keep(None, 200, headers={'x-keep-replicas-stored': 2}):
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ writer.manifest_text()
+
+ def test_write_insufficient_replicas_via_disks(self):
+ client = mock.MagicMock(name='api_client')
+ self.mock_keep_services(client, 200, self.disk_services_available(2))
+ writer = self.foo_writer(api_client=client, replication=3)
+ with self.mock_keep(
+ None, 200, 200,
+ **{'x-keep-replicas-stored': 1}) as keepmock:
+ with self.assertRaises(arvados.errors.KeepWriteError):
+ writer.manifest_text()
+
+ def test_write_three_replicas(self):
+ client = mock.MagicMock(name='api_client')
+ self.mock_keep_services(client, 200, self.disk_services_available(6))
+ writer = self.foo_writer(api_client=client, replication=3)
+ with self.mock_keep(
+ None, 200, 500, 200, 500, 200, 200,
+ **{'x-keep-replicas-stored': 1}) as keepmock:
+ writer.manifest_text()
+ self.assertEqual(5, keepmock.call_count)
+
def test_write_whole_collection_through_retries(self):
writer = self.foo_writer(num_retries=2)
with self.mock_keep(self.DEFAULT_DATA_HASH,