Merge branch 'master' into 3198-writable-fuse
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 21 Apr 2015 17:39:36 +0000 (13:39 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 21 Apr 2015 17:39:36 +0000 (13:39 -0400)
Conflicts:
sdk/python/arvados/arvfile.py

1  2 
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/keep.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py

index 95acb9c31d763bbbce38bad9918f0adb62e39c5c,7742c45c4d3e8ab37f3a7aa7dfe1a88a932ec90e..c0ef5810728e42776d4b57c4f4e6da0b03dd4618
@@@ -473,34 -461,30 +473,37 @@@ class _BlockManager(object)
                      if self._put_queue is not None:
                          self._put_queue.task_done()
  
 -        with self.lock:
 -            if self._put_threads is None:
 -                # Start uploader threads.
 -
 -                # If we don't limit the Queue size, the upload queue can quickly
 -                # grow to take up gigabytes of RAM if the writing process is
 -                # generating data more quickly than it can be send to the Keep
 -                # servers.
 -                #
 -                # With two upload threads and a queue size of 2, this means up to 4
 -                # blocks pending.  If they are full 64 MiB blocks, that means up to
 -                # 256 MiB of internal buffering, which is the same size as the
 -                # default download block cache in KeepClient.
 -                self._put_queue = Queue.Queue(maxsize=2)
 -                self._put_errors = Queue.Queue()
 -
 -                self._put_threads = []
 -                for i in xrange(0, self.num_put_threads):
 -                    thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
 -                    self._put_threads.append(thread)
 -                    thread.daemon = True
 -                    thread.start()
++        if block.state() != _BufferBlock.WRITABLE:
++            return
++
 +        if wait:
 +            block.set_state(_BufferBlock.PENDING)
 +            loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
 +            block.set_state(_BufferBlock.COMMITTED, loc)
 +        else:
 +            with self.lock:
 +                if self._put_threads is None:
 +                    # Start uploader threads.
 +
 +                    # If we don't limit the Queue size, the upload queue can quickly
 +                    # grow to take up gigabytes of RAM if the writing process is
 +                    # generating data more quickly than it can be send to the Keep
 +                    # servers.
 +                    #
 +                    # With two upload threads and a queue size of 2, this means up to 4
 +                    # blocks pending.  If they are full 64 MiB blocks, that means up to
 +                    # 256 MiB of internal buffering, which is the same size as the
 +                    # default download block cache in KeepClient.
 +                    self._put_queue = Queue.Queue(maxsize=2)
 +                    self._put_errors = Queue.Queue()
 +
 +                    self._put_threads = []
 +                    for i in xrange(0, self.num_put_threads):
 +                        thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
 +                        self._put_threads.append(thread)
 +                        thread.daemon = True
 +                        thread.start()
  
 -        if block.state() == _BufferBlock.WRITABLE:
              # Mark the block as PENDING so to disallow any more appends.
              block.set_state(_BufferBlock.PENDING)
              self._put_queue.put(block)
              items = self._bufferblocks.items()
  
          for k,v in items:
 -            v.owner.flush()
 +            if v.state() == _BufferBlock.WRITABLE:
-                 self.commit_bufferblock(v, False)
++                v.owner.flush(False)
  
          with self.lock:
              if self._put_queue is not None:
@@@ -809,26 -786,20 +813,26 @@@ class ArvadosFile(object)
              self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
  
          if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-             self._repack_writes()
+             self._repack_writes(num_retries)
              if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
 -                self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
 +                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
                  self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
  
          self._current_bblock.append(data)
  
          replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
  
 +        self.parent.notify(MOD, self.parent, self.name, (self, self))
 +
 +        return len(data)
 +
      @synchronized
-     def flush(self, wait=True):
 -    def flush(self, num_retries=0):
 -        if self._current_bblock:
 -            self._repack_writes(num_retries)
 -            self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
++    def flush(self, wait=True, num_retries=0):
 +        if self.modified():
 +            if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
-                 self._repack_writes()
++                self._repack_writes(num_retries)
 +                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
 +            self.parent.notify(MOD, self.parent, self.name, (self, self))
  
      @must_be_writable
      @synchronized
index 0ecc34a1df6a9db688cc9f9d4f969ff7e35d58a1,11eb666478525e853ac2b347ca424c01c9948ea7..c90f7b3bc780802d38f1445ff18b41030e7c7d3a
@@@ -843,37 -792,48 +843,48 @@@ class RichCollectionBase(CollectionBase
          :overwrite:
            Whether to overwrite target file if it already exists.
          """
 -        if source_collection is None:
 -            source_collection = self
  
 -        # Find the object to copy
 -        if isinstance(source, basestring):
 -            source_obj = source_collection.find(source)
 -            if source_obj is None:
 -                raise IOError((errno.ENOENT, "File not found"))
 -            sourcecomponents = source.split("/")
 -        else:
 -            source_obj = source
 -            sourcecomponents = None
 +        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
 +        target_dir.add(source_obj, target_name, overwrite, False)
  
 -        # Find parent collection the target path
 -        targetcomponents = target_path.split("/")
 +    @must_be_writable
 +    @synchronized
 +    def rename(self, source, target_path, source_collection=None, overwrite=False):
 +        """Move a file or subcollection from `source_collection` to a new path in this collection.
  
 -        # Determine the name to use.
 -        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
 +        :source:
 +          A string with a path to source file or subcollection.
  
 -        if not target_name:
 -            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
 +        :target_path:
 +          Destination file or path.  If the target path already exists and is a
 +          subcollection, the item will be placed inside the subcollection.  If
 +          the target path already exists and is a file, this will raise an error
 +          unless you specify `overwrite=True`.
  
 -        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
 +        :source_collection:
 +          Collection to copy `source_path` from (default `self`)
  
 -        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
 -            target_dir = target_dir[target_name]
 -            target_name = sourcecomponents[-1]
 +        :overwrite:
 +          Whether to overwrite target file if it already exists.
 +        """
  
 -        target_dir.add(source_obj, target_name, overwrite)
 +        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
 +        if not source_obj.writable():
 +            raise IOError(errno.EROFS, "Source collection must be writable.")
 +        target_dir.add(source_obj, target_name, overwrite, True)
  
-     @synchronized
+     def portable_manifest_text(self, stream_name="."):
+         """Get the manifest text for this collection, sub collections and files.
+         This method does not flush outstanding blocks to Keep.  It will return
+         a normalized manifest with access tokens stripped.
+         :stream_name:
+           Name to use for this stream (directory)
+         """
+         return self._get_manifest_text(stream_name, True, True)
      def manifest_text(self, stream_name=".", strip=False, normalize=False):
          """Get the manifest text for this collection, sub collections and files.
  
  
      def portable_data_hash(self):
          """Get the portable data hash for this collection's manifest."""
-         stripped = self.manifest_text(strip=True)
+         stripped = self.portable_manifest_text()
          return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
  
 +    @synchronized
 +    def subscribe(self, callback):
 +        if self._callback is None:
 +            self._callback = callback
 +        else:
 +            raise errors.ArgumentError("A callback is already set on this collection.")
 +
 +    @synchronized
 +    def unsubscribe(self):
 +        if self._callback is not None:
 +            self._callback = None
 +
 +    @synchronized
 +    def notify(self, event, collection, name, item):
 +        if self._callback:
 +            self._callback(event, collection, name, item)
 +        self.root_collection().notify(event, collection, name, item)
 +
      @synchronized
      def __eq__(self, other):
          if other is self:
@@@ -1399,9 -1373,24 +1442,11 @@@ class Collection(RichCollectionBase)
  
              self._manifest_locator = self._api_response["uuid"]
  
-         self._manifest_text = text
-         self.set_unmodified()
+             self._manifest_text = text
+             self.set_unmodified()
+         return text
  
 -    @synchronized
 -    def subscribe(self, callback):
 -        self.callbacks.append(callback)
 -
 -    @synchronized
 -    def unsubscribe(self, callback):
 -        self.callbacks.remove(callback)
 -
 -    @synchronized
 -    def notify(self, event, collection, name, item):
 -        for c in self.callbacks:
 -            c(event, collection, name, item)
 -
      @synchronized
      def _import_manifest(self, manifest_text):
          """Import a manifest into a `Collection`.
Simple merge
index 3041e28d0a310d2d755aaa621fa32f2314eae90c,3bba841b6d3cabae2a694457ca8088ea88d6957b..f89ac72e66779f14affe59c00b7cd8b314b3ae83
@@@ -569,8 -591,11 +591,11 @@@ class BlockManagerTest(unittest.TestCas
          mockkeep = mock.MagicMock()
          blockmanager = arvados.arvfile._BlockManager(mockkeep)
          bufferblock = blockmanager.alloc_bufferblock()
 -        bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+         bufferblock.owner = mock.MagicMock()
++        bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
          bufferblock.append("foo")
          blockmanager.commit_all()
+         self.assertTrue(bufferblock.owner.flush.called)
          self.assertTrue(mockkeep.put.called)
          self.assertEqual(bufferblock.state(), arvados.arvfile._BufferBlock.COMMITTED)
          self.assertIsNone(bufferblock.buffer_view)
          mockkeep.put.side_effect = arvados.errors.KeepWriteError("fail")
          blockmanager = arvados.arvfile._BlockManager(mockkeep)
          bufferblock = blockmanager.alloc_bufferblock()
 -        bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+         bufferblock.owner = mock.MagicMock()
++        bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
          bufferblock.append("foo")
          with self.assertRaises(arvados.errors.KeepWriteError) as err:
              blockmanager.commit_all()
index 95b6dbe6d1943ad0b348d35bb5abf29eebf79985,d3198be473f6819006bcbb4c1c6ddda1a53c9361..c882971534c2f223f9b1649c28995683131a1d5a
@@@ -884,26 -884,8 +884,26 @@@ class NewCollectionTestCase(unittest.Te
      def test_copy_to_new_dir(self):
          c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
          c.copy("count1.txt", "foo/")
-         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.manifest_text())
+         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
  
 +    def test_rename_file(self):
 +        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c.rename("count1.txt", "count2.txt")
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
 +
 +    def test_move_file_to_dir(self):
 +        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c.mkdirs("foo")
 +        c.rename("count1.txt", "foo/count2.txt")
 +        self.assertEqual("./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
 +
 +    def test_move_file_to_other(self):
 +        c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
 +        c2 = Collection()
 +        c2.rename("count1.txt", "count2.txt", source_collection=c1)
 +        self.assertEqual("", c1.manifest_text())
 +        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c2.manifest_text())
 +
      def test_clone(self):
          c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
          cl = c.clone()