From 4077a9af0985d3c85f2f2de2bb7a0f6be581e71e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 21 Apr 2015 11:44:38 -0400 Subject: [PATCH] 3198: Implement rename() (efficient move within/between collections). --- sdk/python/arvados/arvfile.py | 89 ++++++++++++-------- sdk/python/arvados/collection.py | 109 ++++++++++++++++++------- sdk/python/tests/test_arvfile.py | 8 +- sdk/python/tests/test_collections.py | 30 +++++-- services/fuse/arvados_fuse/__init__.py | 17 ++-- services/fuse/tests/test_mount.py | 2 + 6 files changed, 174 insertions(+), 81 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index eb17bd4e3d..95acb9c31d 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -441,11 +441,17 @@ class _BlockManager(object): self._prefetch_threads = None self._prefetch_queue = None - def commit_bufferblock(self, block): + def commit_bufferblock(self, block, wait): """Initiate a background upload of a bufferblock. - This will block if the upload queue is at capacity, otherwise it will - return immediately. + :block: + The block object to upload + + :wait: + If `wait` is True, upload the block synchronously. + If `wait` is False, upload the block asynchronously. This will + return immediately unless if the upload queue is at capacity, in + which case it will wait on an upload queue slot. """ @@ -467,32 +473,37 @@ class _BlockManager(object): if self._put_queue is not None: self._put_queue.task_done() - with self.lock: - if self._put_threads is None: - # Start uploader threads. - - # If we don't limit the Queue size, the upload queue can quickly - # grow to take up gigabytes of RAM if the writing process is - # generating data more quickly than it can be send to the Keep - # servers. - # - # With two upload threads and a queue size of 2, this means up to 4 - # blocks pending. If they are full 64 MiB blocks, that means up to - # 256 MiB of internal buffering, which is the same size as the - # default download block cache in KeepClient. - self._put_queue = Queue.Queue(maxsize=2) - self._put_errors = Queue.Queue() - - self._put_threads = [] - for i in xrange(0, self.num_put_threads): - thread = threading.Thread(target=commit_bufferblock_worker, args=(self,)) - self._put_threads.append(thread) - thread.daemon = True - thread.start() - - # Mark the block as PENDING so to disallow any more appends. - block.set_state(_BufferBlock.PENDING) - self._put_queue.put(block) + if wait: + block.set_state(_BufferBlock.PENDING) + loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()) + block.set_state(_BufferBlock.COMMITTED, loc) + else: + with self.lock: + if self._put_threads is None: + # Start uploader threads. + + # If we don't limit the Queue size, the upload queue can quickly + # grow to take up gigabytes of RAM if the writing process is + # generating data more quickly than it can be send to the Keep + # servers. + # + # With two upload threads and a queue size of 2, this means up to 4 + # blocks pending. If they are full 64 MiB blocks, that means up to + # 256 MiB of internal buffering, which is the same size as the + # default download block cache in KeepClient. + self._put_queue = Queue.Queue(maxsize=2) + self._put_errors = Queue.Queue() + + self._put_threads = [] + for i in xrange(0, self.num_put_threads): + thread = threading.Thread(target=commit_bufferblock_worker, args=(self,)) + self._put_threads.append(thread) + thread.daemon = True + thread.start() + + # Mark the block as PENDING so to disallow any more appends. + block.set_state(_BufferBlock.PENDING) + self._put_queue.put(block) @synchronized def get_bufferblock(self, locator): @@ -530,7 +541,7 @@ class _BlockManager(object): for k,v in items: if v.state() == _BufferBlock.WRITABLE: - self.commit_bufferblock(v) + self.commit_bufferblock(v, False) with self.lock: if self._put_queue is not None: @@ -605,13 +616,13 @@ class ArvadosFile(object): a list of Range objects representing segments """ self.parent = parent + self.name = name self._modified = True self._segments = [] self.lock = parent.root_collection().lock for s in segments: self._add_segment(stream, s.locator, s.range_size) self._current_bblock = None - self.name = name def writable(self): return self.parent.writable() @@ -800,7 +811,7 @@ class ArvadosFile(object): if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: self._repack_writes() if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE: - self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False) self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self) self._current_bblock.append(data) @@ -812,11 +823,11 @@ class ArvadosFile(object): return len(data) @synchronized - def flush(self): + def flush(self, wait=True): if self.modified(): if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE: self._repack_writes() - self.parent._my_block_manager().commit_bufferblock(self._current_bblock) + self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait) self.parent.notify(MOD, self.parent, self.name, (self, self)) @must_be_writable @@ -863,6 +874,16 @@ class ArvadosFile(object): buf += "\n" return buf + @must_be_writable + @synchronized + def reparent(self, newparent, newname): + self.flush() + self.parent.remove(self.name) + + self.parent = newparent + self.name = newname + self.lock = self.parent.root_collection().lock + self._modified = True class ArvadosFileReader(ArvadosFileReaderBase): """Wraps ArvadosFile in a file-like object supporting reading only. diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index d610c3509d..0ecc34a1df 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -741,8 +741,8 @@ class RichCollectionBase(CollectionBase): @must_be_writable @synchronized - def add(self, source_obj, target_name, overwrite=False): - """Copy a file or subcollection to this collection. + def add(self, source_obj, target_name, overwrite=False, reparent=False): + """Copy or move a file or subcollection to this collection. :source_obj: An ArvadosFile, or Subcollection object @@ -754,6 +754,11 @@ class RichCollectionBase(CollectionBase): :overwrite: Whether to overwrite target file if it already exists. + :reparent: + If True, source_obj will be moved from its parent collection to this collection. + If False, source_obj will be copied and the parent collection will be + unmodified. + """ if target_name in self and not overwrite: @@ -763,15 +768,60 @@ class RichCollectionBase(CollectionBase): if target_name in self: modified_from = self[target_name] - # Actually make the copy. - dup = source_obj.clone(self, target_name) - self._items[target_name] = dup + # Actually make the move or copy. + if reparent: + source_obj.reparent(self, target_name) + item = source_obj + else: + item = source_obj.clone(self, target_name) + + self._items[target_name] = item self._modified = True if modified_from: - self.notify(MOD, self, target_name, (modified_from, dup)) + self.notify(MOD, self, target_name, (modified_from, item)) + else: + self.notify(ADD, self, target_name, item) + + def _get_src_target(self, source, target_path, source_collection, create_dest): + if source_collection is None: + source_collection = self + + # Find the object + if isinstance(source, basestring): + source_obj = source_collection.find(source) + if source_obj is None: + raise IOError(errno.ENOENT, "File not found") + sourcecomponents = source.split("/") + else: + source_obj = source + sourcecomponents = None + + # Find parent collection the target path + targetcomponents = target_path.split("/") + + # Determine the name to use. + target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1] + + if not target_name: + raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") + + if create_dest: + target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) else: - self.notify(ADD, self, target_name, dup) + if len(targetcomponents) > 1: + target_dir = self.find("/".join(targetcomponents[0:-1])) + else: + target_dir = self + + if target_dir is None: + raise IOError(errno.ENOENT, "Target directory not found.") + + if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents: + target_dir = target_dir[target_name] + target_name = sourcecomponents[-1] + + return (source_obj, target_dir, target_name) @must_be_writable @synchronized @@ -793,35 +843,35 @@ class RichCollectionBase(CollectionBase): :overwrite: Whether to overwrite target file if it already exists. """ - if source_collection is None: - source_collection = self - # Find the object to copy - if isinstance(source, basestring): - source_obj = source_collection.find(source) - if source_obj is None: - raise IOError(errno.ENOENT, "File not found") - sourcecomponents = source.split("/") - else: - source_obj = source - sourcecomponents = None + source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) + target_dir.add(source_obj, target_name, overwrite, False) - # Find parent collection the target path - targetcomponents = target_path.split("/") + @must_be_writable + @synchronized + def rename(self, source, target_path, source_collection=None, overwrite=False): + """Move a file or subcollection from `source_collection` to a new path in this collection. - # Determine the name to use. - target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None) + :source: + A string with a path to source file or subcollection. - if not target_name: - raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.") + :target_path: + Destination file or path. If the target path already exists and is a + subcollection, the item will be placed inside the subcollection. If + the target path already exists and is a file, this will raise an error + unless you specify `overwrite=True`. - target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION) + :source_collection: + Collection to copy `source_path` from (default `self`) - if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents: - target_dir = target_dir[target_name] - target_name = sourcecomponents[-1] + :overwrite: + Whether to overwrite target file if it already exists. + """ - target_dir.add(source_obj, target_name, overwrite) + source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) + if not source_obj.writable(): + raise IOError(errno.EROFS, "Source collection must be writable.") + target_dir.add(source_obj, target_name, overwrite, True) @synchronized def manifest_text(self, stream_name=".", strip=False, normalize=False): @@ -1337,6 +1387,7 @@ class Collection(RichCollectionBase): if create_collection_record: if name is None: name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime())) + ensure_unique_name = True body = {"manifest_text": text, "name": name} diff --git a/sdk/python/tests/test_arvfile.py b/sdk/python/tests/test_arvfile.py index 92c778e1c5..3041e28d0a 100644 --- a/sdk/python/tests/test_arvfile.py +++ b/sdk/python/tests/test_arvfile.py @@ -450,8 +450,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase): def test__eq__from_writes(self): with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1: with Collection() as c2: - with c2.open("count1.txt", "w") as f: - f.write("0123456789") + f = c2.open("count1.txt", "w") + f.write("0123456789") self.assertTrue(c1["count1.txt"] == c2["count1.txt"]) self.assertFalse(c1["count1.txt"] != c2["count1.txt"]) @@ -459,8 +459,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase): def test__ne__(self): with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1: with Collection() as c2: - with c2.open("count1.txt", "w") as f: - f.write("1234567890") + f = c2.open("count1.txt", "w") + f.write("1234567890") self.assertTrue(c1["count1.txt"] != c2["count1.txt"]) self.assertFalse(c1["count1.txt"] == c2["count1.txt"]) diff --git a/sdk/python/tests/test_collections.py b/sdk/python/tests/test_collections.py index 8cf34f0282..95b6dbe6d1 100644 --- a/sdk/python/tests/test_collections.py +++ b/sdk/python/tests/test_collections.py @@ -886,6 +886,24 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): c.copy("count1.txt", "foo/") self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text()) + def test_rename_file(self): + c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n') + c.rename("count1.txt", "count2.txt") + self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text()) + + def test_move_file_to_dir(self): + c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n') + c.mkdirs("foo") + c.rename("count1.txt", "foo/count2.txt") + self.assertEqual("./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text()) + + def test_move_file_to_other(self): + c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n') + c2 = Collection() + c2.rename("count1.txt", "count2.txt", source_collection=c1) + self.assertEqual("", c1.manifest_text()) + self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c2.manifest_text()) + def test_clone(self): c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n') cl = c.clone() @@ -985,8 +1003,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): d = c1.diff(c2) self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]), ('add', './count2.txt', c2["count2.txt"])]) - with c1.open("count1.txt", "w") as f: - f.write("zzzzz") + f = c1.open("count1.txt", "w") + f.write("zzzzz") # c1 changed, so it should not be deleted. c1.apply(d) @@ -997,8 +1015,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt') d = c1.diff(c2) self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])]) - with c1.open("count1.txt", "w") as f: - f.write("zzzzz") + f = c1.open("count1.txt", "w") + f.write("zzzzz") # c1 changed, so c2 mod will go to a conflict file c1.apply(d) @@ -1010,8 +1028,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin): d = c1.diff(c2) self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]), ('add', './count1.txt', c2["count1.txt"])]) - with c1.open("count1.txt", "w") as f: - f.write("zzzzz") + f = c1.open("count1.txt", "w") + f.write("zzzzz") # c1 added count1.txt, so c2 add will go to a conflict file c1.apply(d) diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index e5d812824c..e140672cb8 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -472,14 +472,15 @@ class Operations(llfuse.Operations): def rmdir(self, inode_parent, name): self.unlink(inode_parent, name) - # @catch_exceptions - # def rename(self, inode_parent_old, name_old, inode_parent_new, name_new): - # src = self._check_writable(inode_parent_old) - # dest = self._check_writable(inode_parent_new) - # - # with llfuse.lock_released: - # dest.collection.copy(name_old, name_new, source_collection=src.collection, overwrite=True) - # src.collection.remove(name_old) + @catch_exceptions + def rename(self, inode_parent_old, name_old, inode_parent_new, name_new): + src = self._check_writable(inode_parent_old) + dest = self._check_writable(inode_parent_new) + + with llfuse.lock_released: + dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True) + dest.flush() + src.flush() @catch_exceptions def flush(self, fh): diff --git a/services/fuse/tests/test_mount.py b/services/fuse/tests/test_mount.py index ff641ca695..70cfbef417 100644 --- a/services/fuse/tests/test_mount.py +++ b/services/fuse/tests/test_mount.py @@ -462,6 +462,7 @@ class FuseWriteFileTest(MountTestBase): # Forturnately the multiprocessing module makes this relatively easy. pool = multiprocessing.Pool(1) self.assertTrue(pool.apply(fuseWriteFileTestHelper, (self.mounttmp,))) + pool.close() collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() self.assertRegexpMatches(collection2["manifest_text"], @@ -503,6 +504,7 @@ class FuseUpdateFileTest(MountTestBase): pool = multiprocessing.Pool(1) self.assertTrue(pool.apply(fuseUpdateFileTestHelper1, (self.mounttmp,))) self.assertTrue(pool.apply(fuseUpdateFileTestHelper2, (self.mounttmp,))) + pool.close() collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute() self.assertRegexpMatches(collection2["manifest_text"], -- 2.30.2