if self._put_queue is not None:
self._put_queue.task_done()
- with self.lock:
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
- self._put_errors = Queue.Queue()
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
++ if block.state() != _BufferBlock.WRITABLE:
++ return
++
+ if wait:
+ block.set_state(_BufferBlock.PENDING)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ block.set_state(_BufferBlock.COMMITTED, loc)
+ else:
+ with self.lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+ self._put_errors = Queue.Queue()
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
- if block.state() == _BufferBlock.WRITABLE:
# Mark the block as PENDING so to disallow any more appends.
block.set_state(_BufferBlock.PENDING)
self._put_queue.put(block)
items = self._bufferblocks.items()
for k,v in items:
- v.owner.flush()
+ if v.state() == _BufferBlock.WRITABLE:
- self.commit_bufferblock(v, False)
++ v.owner.flush(False)
with self.lock:
if self._put_queue is not None:
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self._repack_writes()
+ self._repack_writes(num_retries)
if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
self._current_bblock.append(data)
replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
+ self.parent.notify(MOD, self.parent, self.name, (self, self))
+
+ return len(data)
+
@synchronized
- def flush(self, wait=True):
- def flush(self, num_retries=0):
- if self._current_bblock:
- self._repack_writes(num_retries)
- self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
++ def flush(self, wait=True, num_retries=0):
+ if self.modified():
+ if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
- self._repack_writes()
++ self._repack_writes(num_retries)
+ self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
+ self.parent.notify(MOD, self.parent, self.name, (self, self))
@must_be_writable
@synchronized
:overwrite:
Whether to overwrite target file if it already exists.
"""
- if source_collection is None:
- source_collection = self
- # Find the object to copy
- if isinstance(source, basestring):
- source_obj = source_collection.find(source)
- if source_obj is None:
- raise IOError((errno.ENOENT, "File not found"))
- sourcecomponents = source.split("/")
- else:
- source_obj = source
- sourcecomponents = None
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+ target_dir.add(source_obj, target_name, overwrite, False)
- # Find parent collection the target path
- targetcomponents = target_path.split("/")
+ @must_be_writable
+ @synchronized
+ def rename(self, source, target_path, source_collection=None, overwrite=False):
+ """Move a file or subcollection from `source_collection` to a new path in this collection.
- # Determine the name to use.
- target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+ :source:
+ A string with a path to source file or subcollection.
- if not target_name:
- raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
+ :target_path:
+ Destination file or path. If the target path already exists and is a
+ subcollection, the item will be placed inside the subcollection. If
+ the target path already exists and is a file, this will raise an error
+ unless you specify `overwrite=True`.
- target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+ :source_collection:
+ Collection to copy `source_path` from (default `self`)
- if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
- target_dir = target_dir[target_name]
- target_name = sourcecomponents[-1]
+ :overwrite:
+ Whether to overwrite target file if it already exists.
+ """
- target_dir.add(source_obj, target_name, overwrite)
+ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+ if not source_obj.writable():
+ raise IOError(errno.EROFS, "Source collection must be writable.")
+ target_dir.add(source_obj, target_name, overwrite, True)
- @synchronized
+ def portable_manifest_text(self, stream_name="."):
+ """Get the manifest text for this collection, sub collections and files.
+
+ This method does not flush outstanding blocks to Keep. It will return
+ a normalized manifest with access tokens stripped.
+
+ :stream_name:
+ Name to use for this stream (directory)
+
+ """
+ return self._get_manifest_text(stream_name, True, True)
+
def manifest_text(self, stream_name=".", strip=False, normalize=False):
"""Get the manifest text for this collection, sub collections and files.
def portable_data_hash(self):
"""Get the portable data hash for this collection's manifest."""
- stripped = self.manifest_text(strip=True)
+ stripped = self.portable_manifest_text()
return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+ @synchronized
+ def subscribe(self, callback):
+ if self._callback is None:
+ self._callback = callback
+ else:
+ raise errors.ArgumentError("A callback is already set on this collection.")
+
+ @synchronized
+ def unsubscribe(self):
+ if self._callback is not None:
+ self._callback = None
+
+ @synchronized
+ def notify(self, event, collection, name, item):
+ if self._callback:
+ self._callback(event, collection, name, item)
+ self.root_collection().notify(event, collection, name, item)
+
@synchronized
def __eq__(self, other):
if other is self:
self._manifest_locator = self._api_response["uuid"]
- self._manifest_text = text
- self.set_unmodified()
+ self._manifest_text = text
+ self.set_unmodified()
+
+ return text
- @synchronized
- def subscribe(self, callback):
- self.callbacks.append(callback)
-
- @synchronized
- def unsubscribe(self, callback):
- self.callbacks.remove(callback)
-
- @synchronized
- def notify(self, event, collection, name, item):
- for c in self.callbacks:
- c(event, collection, name, item)
-
@synchronized
def _import_manifest(self, manifest_text):
"""Import a manifest into a `Collection`.
mockkeep = mock.MagicMock()
blockmanager = arvados.arvfile._BlockManager(mockkeep)
bufferblock = blockmanager.alloc_bufferblock()
- bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+ bufferblock.owner = mock.MagicMock()
++ bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
bufferblock.append("foo")
blockmanager.commit_all()
+ self.assertTrue(bufferblock.owner.flush.called)
self.assertTrue(mockkeep.put.called)
self.assertEqual(bufferblock.state(), arvados.arvfile._BufferBlock.COMMITTED)
self.assertIsNone(bufferblock.buffer_view)
mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
blockmanager = arvados.arvfile._BlockManager(mockkeep)
bufferblock = blockmanager.alloc_bufferblock()
- bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+ bufferblock.owner = mock.MagicMock()
++ bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
bufferblock.append("foo")
with self.assertRaises(arvados.errors.KeepWriteError) as err:
blockmanager.commit_all()
def test_copy_to_new_dir(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
c.copy("count1.txt", "foo/")
- self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
+ def test_rename_file(self):
+ c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+ c.rename("count1.txt", "count2.txt")
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+ def test_move_file_to_dir(self):
+ c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+ c.mkdirs("foo")
+ c.rename("count1.txt", "foo/count2.txt")
+ self.assertEqual("./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+ def test_move_file_to_other(self):
+ c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+ c2 = Collection()
+ c2.rename("count1.txt", "count2.txt", source_collection=c1)
+ self.assertEqual("", c1.manifest_text())
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c2.manifest_text())
+
def test_clone(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
cl = c.clone()