11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / arvados / arvfile.py
index 931a3a198db9407fd72b274225821cc3f12ca6d7..2fc9c73afe031543ab7afb67edcd2e7a5c4c7d6f 100644 (file)
@@ -24,7 +24,7 @@ from . import config
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
 from ._normalize_stream import normalize_stream
-from ._ranges import locators_and_ranges, replace_range, Range
+from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
 from .retry import retry_method
 
 MOD = "mod"
@@ -300,6 +300,7 @@ class _BufferBlock(object):
     PENDING = 1
     COMMITTED = 2
     ERROR = 3
+    DELETED = 4
 
     def __init__(self, blockid, starting_capacity, owner):
         """
@@ -396,10 +397,54 @@ class _BufferBlock(object):
 
     @synchronized
     def clear(self):
+        self._state = _BufferBlock.DELETED
         self.owner = None
         self.buffer_block = None
         self.buffer_view = None
 
+    @synchronized
+    def repack_writes(self):
+        """Optimize buffer block by repacking segments in file sequence.
+
+        When the client makes random writes, they appear in the buffer block in
+        the sequence they were written rather than the sequence they appear in
+        the file.  This makes for inefficient, fragmented manifests.  Attempt
+        to optimize by repacking writes in file sequence.
+
+        """
+        if self._state != _BufferBlock.WRITABLE:
+            raise AssertionError("Cannot repack non-writable block")
+
+        segs = self.owner.segments()
+
+        # Collect the segments that reference the buffer block.
+        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+
+        # Collect total data referenced by segments (could be smaller than
+        # bufferblock size if a portion of the file was written and
+        # then overwritten).
+        write_total = sum([s.range_size for s in bufferblock_segs])
+
+        if write_total < self.size() or len(bufferblock_segs) > 1:
+            # If there's more than one segment referencing this block, it is
+            # due to out-of-order writes and will produce a fragmented
+            # manifest, so try to optimize by re-packing into a new buffer.
+            contents = self.buffer_view[0:self.write_pointer].tobytes()
+            new_bb = _BufferBlock(None, write_total, None)
+            for t in bufferblock_segs:
+                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+                t.segment_offset = new_bb.size() - t.range_size
+
+            self.buffer_block = new_bb.buffer_block
+            self.buffer_view = new_bb.buffer_view
+            self.write_pointer = new_bb.write_pointer
+            self._locator = None
+            new_bb.clear()
+            self.owner.set_segments(segs)
+
+    def __repr__(self):
+        return "<BufferBlock %s>" % (self.blockid)
+
 
 class NoopLock(object):
     def __enter__(self):
@@ -473,7 +518,7 @@ class _BlockManager(object):
 
     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         if blockid is None:
-            blockid = "%s" % uuid.uuid4()
+            blockid = str(uuid.uuid4())
         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -489,7 +534,7 @@ class _BlockManager(object):
           ArvadosFile that owns the new block
 
         """
-        new_blockid = "bufferblock%i" % len(self._bufferblocks)
+        new_blockid = str(uuid.uuid4())
         bufferblock = block.clone(new_blockid, owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -594,43 +639,57 @@ class _BlockManager(object):
     @synchronized
     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
+
         self._pending_write_size += closed_file_size
 
         # Check if there are enough small blocks for filling up one in full
-        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+            return
 
-            # Search blocks ready for getting packed together before being committed to Keep.
-            # A WRITABLE block always has an owner.
-            # A WRITABLE block with its owner.closed() implies that it's
-            # size is <= KEEP_BLOCK_SIZE/2.
-            try:
-                small_blocks = [b for b in listvalues(self._bufferblocks) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-            except AttributeError:
-                # Writable blocks without owner shouldn't exist.
-                raise UnownedBlockError()
+        # Search blocks ready for getting packed together before being
+        # committed to Keep.
+        # A WRITABLE block always has an owner.
+        # A WRITABLE block with its owner.closed() implies that its
+        # size is <= KEEP_BLOCK_SIZE/2.
+        try:
+            small_blocks = [b for b in listvalues(self._bufferblocks)
+                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+        except AttributeError:
+            # Writable blocks without owner shouldn't exist.
+            raise UnownedBlockError()
+
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
+            return
 
-            if len(small_blocks) <= 1:
-                # Not enough small blocks for repacking
-                return
+        for bb in small_blocks:
+            bb.repack_writes()
 
-            # Update the pending write size count with its true value, just in case
-            # some small file was opened, written and closed several times.
-            self._pending_write_size = sum([b.size() for b in small_blocks])
-            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
-                return
+        # Update the pending write size count with its true value, just in case
+        # some small file was opened, written and closed several times.
+        self._pending_write_size = sum([b.size() for b in small_blocks])
 
-            new_bb = self._alloc_bufferblock()
-            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
-                bb = small_blocks.pop(0)
-                arvfile = bb.owner
-                self._pending_write_size -= bb.size()
-                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                arvfile.set_segments([Range(new_bb.blockid,
-                                            0,
-                                            bb.size(),
-                                            new_bb.write_pointer - bb.size())])
-                self._delete_bufferblock(bb.blockid)
-            self.commit_bufferblock(new_bb, sync=sync)
+        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+            return
+
+        new_bb = self._alloc_bufferblock()
+        files = []
+        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+            bb = small_blocks.pop(0)
+            self._pending_write_size -= bb.size()
+            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+            files.append((bb, new_bb.write_pointer - bb.size()))
+
+        self.commit_bufferblock(new_bb, sync=sync)
+
+        for bb, new_bb_segment_offset in files:
+            newsegs = bb.owner.segments()
+            for s in newsegs:
+                if s.locator == bb.blockid:
+                    s.locator = new_bb.locator()
+                    s.segment_offset = new_bb_segment_offset+s.segment_offset
+            bb.owner.set_segments(newsegs)
+            self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -1012,37 +1071,6 @@ class ArvadosFile(object):
 
         return b''.join(data)
 
-    def _repack_writes(self, num_retries):
-        """Optimize buffer block by repacking segments in file sequence.
-
-        When the client makes random writes, they appear in the buffer block in
-        the sequence they were written rather than the sequence they appear in
-        the file.  This makes for inefficient, fragmented manifests.  Attempt
-        to optimize by repacking writes in file sequence.
-
-        """
-        segs = self._segments
-
-        # Collect the segments that reference the buffer block.
-        bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
-        # Collect total data referenced by segments (could be smaller than
-        # bufferblock size if a portion of the file was written and
-        # then overwritten).
-        write_total = sum([s.range_size for s in bufferblock_segs])
-
-        if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
-            # If there's more than one segment referencing this block, it is
-            # due to out-of-order writes and will produce a fragmented
-            # manifest, so try to optimize by re-packing into a new buffer.
-            contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
-            new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
-            for t in bufferblock_segs:
-                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
-                t.segment_offset = new_bb.size() - t.range_size
-
-            self._current_bblock = new_bb
-
     @must_be_writable
     @synchronized
     def writeto(self, offset, data, num_retries):
@@ -1075,7 +1103,7 @@ class ArvadosFile(object):
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-            self._repack_writes(num_retries)
+            self._current_bblock.repack_writes()
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -1102,8 +1130,9 @@ class ArvadosFile(object):
 
         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
             if self._current_bblock.state() == _BufferBlock.WRITABLE:
-                self._repack_writes(num_retries)
-            self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
+                self._current_bblock.repack_writes()
+            if self._current_bblock.state() != _BufferBlock.DELETED:
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
 
         if sync:
             to_delete = set()
@@ -1152,17 +1181,17 @@ class ArvadosFile(object):
                       normalize=False, only_committed=False):
         buf = ""
         filestream = []
-        for segment in self.segments:
+        for segment in self._segments:
             loc = segment.locator
             if self.parent._my_block_manager().is_bufferblock(loc):
                 if only_committed:
                     continue
-                loc = self._bufferblocks[loc].calculate_locator()
+                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
-            filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
                                  segment.segment_offset, segment.range_size))
-        buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
+        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
         buf += "\n"
         return buf