From: Lucas Di Pentima Date: Fri, 26 Aug 2016 21:04:45 +0000 (-0300) Subject: Merge branch '9463-pysdk-amended' X-Git-Tag: 1.1.0~775 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/9b98829c565a2aa487d21ecd7f9429c23d0cec20?hp=7213d3096cdb5d5e03b559a04f88fcd22a835076 Merge branch '9463-pysdk-amended' Refs #9463 --- diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 2dff3e8f9d..21be5c71b9 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -156,8 +156,9 @@ class TestSubmit(unittest.TestCase): mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 ' '0:0:blub.txt 0:0:submit_tool.cwl\n', 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz', - 'name': 'New collection'}, - ensure_unique_name=True), + 'name': 'New collection', + 'replication_desired': None, + }, ensure_unique_name=True), mock.call().execute(num_retries=4), mock.call(body={ 'manifest_text': @@ -215,8 +216,9 @@ class TestSubmit(unittest.TestCase): mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 ' '0:0:blub.txt 0:0:submit_tool.cwl\n', 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz', - 'name': 'New collection'}, - ensure_unique_name=True), + 'name': 'New collection', + 'replication_desired': None, + }, ensure_unique_name=True), mock.call().execute(num_retries=4), mock.call(body={ 'manifest_text': diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index b78c63e301..f2f7df2dce 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -402,7 +402,7 @@ class _BlockManager(object): DEFAULT_PUT_THREADS = 2 DEFAULT_GET_THREADS = 2 - def __init__(self, keep): + def __init__(self, keep, copies=None): """keep: KeepClient object to use""" self._keep = keep self._bufferblocks = {} @@ -414,6 +414,7 @@ class _BlockManager(object): self.prefetch_enabled = True self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS + self.copies = copies @synchronized def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None): @@ -464,7 +465,10 @@ class _BlockManager(object): if bufferblock is None: return - loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()) + else: + loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies) bufferblock.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: @@ -577,7 +581,10 @@ class _BlockManager(object): if sync: try: - loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + if self.copies is None: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + else: + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies) block.set_state(_BufferBlock.COMMITTED, loc) except Exception as e: block.set_state(_BufferBlock.ERROR, e) diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 8450bd1ca0..56d8b23933 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -922,7 +922,7 @@ class RichCollectionBase(CollectionBase): return self._get_manifest_text(stream_name, strip, normalize) @synchronized - def _get_manifest_text(self, stream_name, strip, normalize): + def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False): """Get the manifest text for this collection, sub collections and files. :stream_name: @@ -938,6 +938,9 @@ class RichCollectionBase(CollectionBase): is not modified, return the original manifest text even if it is not in normalized form. + :only_committed: + If True, only include blocks that were already committed to Keep. + """ if not self.committed() or self._manifest_text is None or normalize: @@ -951,6 +954,8 @@ class RichCollectionBase(CollectionBase): for segment in arvfile.segments(): loc = segment.locator if arvfile.parent._my_block_manager().is_bufferblock(loc): + if only_committed: + continue loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator() if strip: loc = KeepLocator(loc).stripped() @@ -1135,7 +1140,8 @@ class Collection(RichCollectionBase): num_retries=None, parent=None, apiconfig=None, - block_manager=None): + block_manager=None, + replication_desired=None): """Collection constructor. :manifest_locator_or_text: @@ -1143,24 +1149,35 @@ class Collection(RichCollectionBase): a manifest, raw manifest text, or None (to create an empty collection). :parent: the parent Collection, may be None. + :apiconfig: A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN. Prefer this over supplying your own api_client and keep_client (except in testing). Will use default config settings if not specified. + :api_client: The API client object to use for requests. If not specified, create one using `apiconfig`. + :keep_client: the Keep client to use for requests. If not specified, create one using `apiconfig`. + :num_retries: the number of retries for API and Keep requests. + :block_manager: the block manager to use. If not specified, create one. + :replication_desired: + How many copies should Arvados maintain. If None, API server default + configuration applies. If not None, this value will also be used + for determining the number of block copies being written. + """ super(Collection, self).__init__(parent) self._api_client = api_client self._keep_client = keep_client self._block_manager = block_manager + self.replication_desired = replication_desired if apiconfig: self._config = apiconfig @@ -1232,7 +1249,8 @@ class Collection(RichCollectionBase): def _my_api(self): if self._api_client is None: self._api_client = ThreadSafeApiCache(self._config) - self._keep_client = self._api_client.keep + if self._keep_client is None: + self._keep_client = self._api_client.keep return self._api_client @synchronized @@ -1247,7 +1265,10 @@ class Collection(RichCollectionBase): @synchronized def _my_block_manager(self): if self._block_manager is None: - self._block_manager = _BlockManager(self._my_keep()) + copies = (self.replication_desired or + self._my_api()._rootDesc.get('defaultCollectionReplication', + 2)) + self._block_manager = _BlockManager(self._my_keep(), copies=copies) return self._block_manager def _remember_api_response(self, response): @@ -1267,6 +1288,10 @@ class Collection(RichCollectionBase): uuid=self._manifest_locator).execute( num_retries=self.num_retries)) self._manifest_text = self._api_response['manifest_text'] + # If not overriden via kwargs, we should try to load the + # replication_desired from the API server + if self.replication_desired is None: + self.replication_desired = self._api_response.get('replication_desired', None) return None except Exception as e: return e @@ -1477,7 +1502,8 @@ class Collection(RichCollectionBase): ensure_unique_name = True body = {"manifest_text": text, - "name": name} + "name": name, + "replication_desired": self.replication_desired} if owner_uuid: body["owner_uuid"] = owner_uuid diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py index ea8661437c..6b3562602a 100644 --- a/sdk/python/tests/test_arvfile.py +++ b/sdk/python/tests/test_arvfile.py @@ -28,7 +28,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def get_from_cache(self, locator): self.requests.append(locator) return self.blocks.get(locator) - def put(self, data, num_retries=None): + def put(self, data, num_retries=None, copies=None): pdh = tutil.str_keep_locator(data) self.blocks[pdh] = str(data) return pdh @@ -37,6 +37,11 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def __init__(self, b, r): self.body = b self.response = r + self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema() + self._rootDesc = {} + class MockSchema(object): + def __init__(self): + self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}} class MockCollections(object): def __init__(self, b, r): self.body = b @@ -59,7 +64,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_truncate(self): keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate", - "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n"}, + "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n"}) with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', @@ -86,7 +92,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_write_to_end(self): keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_append", - "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"}, + "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"}) with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', @@ -222,7 +229,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_write_large(self): keep = ArvadosFileWriterTestCase.MockKeep({}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large", - "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n"}, + "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n"}) with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt', @@ -313,7 +321,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_write_large_rewrite(self): keep = ArvadosFileWriterTestCase.MockKeep({}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large", - "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n"}, + "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n"}) with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt', @@ -335,7 +344,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_create(self): keep = ArvadosFileWriterTestCase.MockKeep({}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_create", - "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"}, + "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"}) with Collection(api_client=api, keep_client=keep) as c: @@ -356,7 +366,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_create_subdir(self): keep = ArvadosFileWriterTestCase.MockKeep({}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_create", - "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"}, + "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"}) with Collection(api_client=api, keep_client=keep) as c: @@ -371,7 +382,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_overwrite(self): keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_overwrite", - "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"}, + "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"}) with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', @@ -400,7 +412,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase): def test_create_multiple(self): keep = ArvadosFileWriterTestCase.MockKeep({}) api = ArvadosFileWriterTestCase.MockApi({"name":"test_create_multiple", - "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n"}, + "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n", + "replication_desired":None}, {"uuid":"zzzzz-4zz18-mockcollection0", "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n"}) with Collection(api_client=api, keep_client=keep) as c: diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index ff0d6847ce..cf8f23e375 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -804,6 +804,24 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin): class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): + def test_replication_desired_kept_on_load(self): + m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n' + c1 = Collection(m, replication_desired=1) + c1.save_new() + loc = c1.manifest_locator() + c2 = Collection(loc) + self.assertEqual(c1.manifest_text, c2.manifest_text) + self.assertEqual(c1.replication_desired, c2.replication_desired) + + def test_replication_desired_not_loaded_if_provided(self): + m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n' + c1 = Collection(m, replication_desired=1) + c1.save_new() + loc = c1.manifest_locator() + c2 = Collection(loc, replication_desired=2) + self.assertEqual(c1.manifest_text, c2.manifest_text) + self.assertNotEqual(c1.replication_desired, c2.replication_desired) + def test_init_manifest(self): m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt @@ -1064,6 +1082,30 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): self.assertEqual(c1["count1.txt"].size(), 0) +class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers): + def test_get_manifest_text_only_committed(self): + c = Collection() + with c.open("count.txt", "w") as f: + # One file committed + with c.open("foo.txt", "w") as foo: + foo.write("foo") + f.write("0123456789") + # Other file not committed. Block not written to keep yet. + self.assertEqual( + c._get_manifest_text(".", + strip=False, + normalize=False, + only_committed=True), + '. acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:count.txt 0:3:foo.txt\n') + # And now with the file closed... + self.assertEqual( + c._get_manifest_text(".", + strip=False, + normalize=False, + only_committed=True), + ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:10:count.txt 10:3:foo.txt\n") + + class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers): MAIN_SERVER = {} KEEP_SERVER = {}