mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
'0:0:blub.txt 0:0:submit_tool.cwl\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
- 'name': 'New collection'},
- ensure_unique_name=True),
+ 'name': 'New collection',
+ 'replication_desired': None,
+ }, ensure_unique_name=True),
mock.call().execute(num_retries=4),
mock.call(body={
'manifest_text':
mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
'0:0:blub.txt 0:0:submit_tool.cwl\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
- 'name': 'New collection'},
- ensure_unique_name=True),
+ 'name': 'New collection',
+ 'replication_desired': None,
+ }, ensure_unique_name=True),
mock.call().execute(num_retries=4),
mock.call(body={
'manifest_text':
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep):
+ def __init__(self, keep, copies=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = {}
self.prefetch_enabled = True
self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.copies = copies
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if bufferblock is None:
return
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ if self.copies is None:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
if sync:
try:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ if self.copies is None:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ else:
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
return self._get_manifest_text(stream_name, strip, normalize)
@synchronized
- def _get_manifest_text(self, stream_name, strip, normalize):
+ def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
"""Get the manifest text for this collection, sub collections and files.
:stream_name:
is not modified, return the original manifest text even if it is not
in normalized form.
+ :only_committed:
+ If True, only include blocks that were already committed to Keep.
+
"""
if not self.committed() or self._manifest_text is None or normalize:
for segment in arvfile.segments():
loc = segment.locator
if arvfile.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
if strip:
loc = KeepLocator(loc).stripped()
num_retries=None,
parent=None,
apiconfig=None,
- block_manager=None):
+ block_manager=None,
+ replication_desired=None):
"""Collection constructor.
:manifest_locator_or_text:
a manifest, raw manifest text, or None (to create an empty collection).
:parent:
the parent Collection, may be None.
+
:apiconfig:
A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
Prefer this over supplying your own api_client and keep_client (except in testing).
Will use default config settings if not specified.
+
:api_client:
The API client object to use for requests. If not specified, create one using `apiconfig`.
+
:keep_client:
the Keep client to use for requests. If not specified, create one using `apiconfig`.
+
:num_retries:
the number of retries for API and Keep requests.
+
:block_manager:
the block manager to use. If not specified, create one.
+ :replication_desired:
+ How many copies should Arvados maintain. If None, API server default
+ configuration applies. If not None, this value will also be used
+ for determining the number of block copies being written.
+
"""
super(Collection, self).__init__(parent)
self._api_client = api_client
self._keep_client = keep_client
self._block_manager = block_manager
+ self.replication_desired = replication_desired
if apiconfig:
self._config = apiconfig
def _my_api(self):
if self._api_client is None:
self._api_client = ThreadSafeApiCache(self._config)
- self._keep_client = self._api_client.keep
+ if self._keep_client is None:
+ self._keep_client = self._api_client.keep
return self._api_client
@synchronized
@synchronized
def _my_block_manager(self):
if self._block_manager is None:
- self._block_manager = _BlockManager(self._my_keep())
+ copies = (self.replication_desired or
+ self._my_api()._rootDesc.get('defaultCollectionReplication',
+ 2))
+ self._block_manager = _BlockManager(self._my_keep(), copies=copies)
return self._block_manager
def _remember_api_response(self, response):
uuid=self._manifest_locator).execute(
num_retries=self.num_retries))
self._manifest_text = self._api_response['manifest_text']
+ # If not overriden via kwargs, we should try to load the
+ # replication_desired from the API server
+ if self.replication_desired is None:
+ self.replication_desired = self._api_response.get('replication_desired', None)
return None
except Exception as e:
return e
ensure_unique_name = True
body = {"manifest_text": text,
- "name": name}
+ "name": name,
+ "replication_desired": self.replication_desired}
if owner_uuid:
body["owner_uuid"] = owner_uuid
def get_from_cache(self, locator):
self.requests.append(locator)
return self.blocks.get(locator)
- def put(self, data, num_retries=None):
+ def put(self, data, num_retries=None, copies=None):
pdh = tutil.str_keep_locator(data)
self.blocks[pdh] = str(data)
return pdh
def __init__(self, b, r):
self.body = b
self.response = r
+ self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema()
+ self._rootDesc = {}
+ class MockSchema(object):
+ def __init__(self):
+ self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}}
class MockCollections(object):
def __init__(self, b, r):
self.body = b
def test_truncate(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate",
- "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n"},
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n"})
with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
def test_write_to_end(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_append",
- "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"},
+ "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"})
with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
def test_write_large(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large",
- "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n"},
+ "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n"})
with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
def test_write_large_rewrite(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large",
- "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n"},
+ "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n"})
with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
def test_create(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_create",
- "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"},
+ "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"})
with Collection(api_client=api, keep_client=keep) as c:
def test_create_subdir(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_create",
- "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"},
+ "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"})
with Collection(api_client=api, keep_client=keep) as c:
def test_overwrite(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_overwrite",
- "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"},
+ "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"})
with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
def test_create_multiple(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_create_multiple",
- "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n"},
+ "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n",
+ "replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
"manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n"})
with Collection(api_client=api, keep_client=keep) as c:
class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
+ def test_replication_desired_kept_on_load(self):
+ m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+ c1 = Collection(m, replication_desired=1)
+ c1.save_new()
+ loc = c1.manifest_locator()
+ c2 = Collection(loc)
+ self.assertEqual(c1.manifest_text, c2.manifest_text)
+ self.assertEqual(c1.replication_desired, c2.replication_desired)
+
+ def test_replication_desired_not_loaded_if_provided(self):
+ m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+ c1 = Collection(m, replication_desired=1)
+ c1.save_new()
+ loc = c1.manifest_locator()
+ c2 = Collection(loc, replication_desired=2)
+ self.assertEqual(c1.manifest_text, c2.manifest_text)
+ self.assertNotEqual(c1.replication_desired, c2.replication_desired)
+
def test_init_manifest(self):
m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
self.assertEqual(c1["count1.txt"].size(), 0)
+class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
+ def test_get_manifest_text_only_committed(self):
+ c = Collection()
+ with c.open("count.txt", "w") as f:
+ # One file committed
+ with c.open("foo.txt", "w") as foo:
+ foo.write("foo")
+ f.write("0123456789")
+ # Other file not committed. Block not written to keep yet.
+ self.assertEqual(
+ c._get_manifest_text(".",
+ strip=False,
+ normalize=False,
+ only_committed=True),
+ '. acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:count.txt 0:3:foo.txt\n')
+ # And now with the file closed...
+ self.assertEqual(
+ c._get_manifest_text(".",
+ strip=False,
+ normalize=False,
+ only_committed=True),
+ ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:10:count.txt 10:3:foo.txt\n")
+
+
class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}