else:
return self._manifest_text
+ @synchronized
+ def _copy_remote_blocks(self, remote_blocks={}):
+ """Scan through the entire collection and ask Keep to copy remote blocks.
+
+ When accessing a remote collection, blocks will have a remote signature
+ (+R instead of +A). Collect these signatures and request Keep to copy the
+ blocks to the local cluster, returning local (+A) signatures.
+
+ :remote_blocks:
+ Shared cache of remote to local block mappings. This is used to avoid
+ doing extra work when blocks are shared by more than one file in
+ different subdirectories.
+
+ """
+ for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
+ for s in self[filename].segments():
+ if '+R' in s.locator:
+ try:
+ loc = remote_blocks[s.locator]
+ except KeyError:
+ loc = self._my_keep().refresh_signature(s.locator)
+ remote_blocks[s.locator] = loc
+ s.locator = loc
+ self.set_committed(False)
+ for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
+ remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+ return remote_blocks
+
@synchronized
def diff(self, end_collection, prefix=".", holding_collection=None):
"""Generate list of add/modify/delete actions.
def _has_collection_uuid(self):
return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
+ def _has_local_collection_uuid(self):
+ return self._has_collection_uuid and \
+ self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
+
def __enter__(self):
return self
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+
if not self.committed():
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
+ elif not self._has_local_collection_uuid():
+ raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
self._my_block_manager().commit_all()
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServers):
MAIN_SERVER = {}
KEEP_SERVER = {}
+ local_locator_re = r"[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
+ remote_locator_re = r"[0-9a-f]{32}\+\d+\+R[a-z]{5}-[a-f0-9]{40}@[a-f0-9]{8}"
def setUp(self):
self.keep_put = getattr(arvados.keep.KeepClient, 'put')
- def test_repacked_block_submission_get_permission_token(self):
+ @mock.patch('arvados.keep.KeepClient.put', autospec=True)
+ def test_repacked_block_submission_get_permission_token(self, mocked_put):
'''
Make sure that those blocks that are committed after repacking small ones,
get their permission tokens assigned on the collection manifest.
time.sleep(1)
return self.keep_put(*args, **kwargs)
- re_locator = "[0-9a-f]{32}\+\d+\+A[a-f0-9]{40}@[a-f0-9]{8}"
-
- with mock.patch('arvados.keep.KeepClient.put', autospec=True) as mocked_put:
- mocked_put.side_effect = wrapped_keep_put
- c = Collection()
- # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
- # small ones before finishing the upload.
- for i in range(70):
- f = c.open("file_{}.txt".format(i), 'wb')
- f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
- f.close(flush=False)
- # We should get 2 blocks with their tokens
- self.assertEqual(len(re.findall(re_locator, c.manifest_text())), 2)
+ mocked_put.side_effect = wrapped_keep_put
+ c = Collection()
+ # Write 70 files ~1MiB each so we force to produce 1 big block by repacking
+ # small ones before finishing the upload.
+ for i in range(70):
+ f = c.open("file_{}.txt".format(i), 'wb')
+ f.write(random.choice('abcdefghijklmnopqrstuvwxyz') * (2**20+i))
+ f.close(flush=False)
+ # We should get 2 blocks with their tokens
+ self.assertEqual(len(re.findall(self.local_locator_re, c.manifest_text())), 2)
+
+ @mock.patch('arvados.keep.KeepClient.refresh_signature')
+ def test_copy_remote_blocks_on_save_new(self, rs_mock):
+ remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+ local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+ rs_mock.return_value = local_block_loc
+ c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, c.manifest_text())), 1)
+ c.save_new()
+ rs_mock.assert_called()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, c.manifest_text())), 1)
+
+ @mock.patch('arvados.keep.KeepClient.refresh_signature')
+ def test_copy_remote_blocks_on_save(self, rs_mock):
+ remote_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+Remote-" + "a" * 40 + "@abcdef01"
+ local_block_loc = "acbd18db4cc2f85cedef654fccc4a4d8+3+A" + "b" * 40 + "@abcdef01"
+ rs_mock.return_value = local_block_loc
+ # Remote collection
+ remote_c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, remote_c.manifest_text())), 1)
+ # Local collection
+ local_c = Collection()
+ with local_c.open('barfile.txt', 'wb') as f:
+ f.write('bar')
+ local_c.save_new()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
+ # Copy remote file to local collection
+ local_c.copy('./foofile.txt', './copied/foofile.txt', remote_c)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 1)
+ # Save local collection: remote block should be copied
+ local_c.save()
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 2)
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):