Merge branch '11305-migrate-docker19-doc'
[arvados.git] / sdk / python / arvados / arvfile.py
index ebefc5bfcea0a88f1048a9d2ee192b34e3a24e96..a2ec76a0761cb43a43731e5e31e9638f0dbf3019 100644 (file)
@@ -16,7 +16,7 @@ import uuid
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
 from ._normalize_stream import normalize_stream
-from ._ranges import locators_and_ranges, replace_range, Range
+from ._ranges import locators_and_ranges, replace_range, Range, LocatorAndRange
 from .retry import retry_method
 
 MOD = "mod"
@@ -99,10 +99,20 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         if pos < 0L:
             raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
         self._filepos = pos
+        return self._filepos
 
     def tell(self):
         return self._filepos
 
+    def readable(self):
+        return True
+
+    def writable(self):
+        return False
+
+    def seekable(self):
+        return True
+
     @_FileLikeObjectBase._before_close
     @retry_method
     def readall(self, size=2**20, num_retries=None):
@@ -174,13 +184,13 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         return ''.join(data).splitlines(True)
 
     def size(self):
-        raise NotImplementedError()
+        raise IOError(errno.ENOSYS, "Not implemented")
 
     def read(self, size, num_retries=None):
-        raise NotImplementedError()
+        raise IOError(errno.ENOSYS, "Not implemented")
 
     def readfrom(self, start, size, num_retries=None):
-        raise NotImplementedError()
+        raise IOError(errno.ENOSYS, "Not implemented")
 
 
 class StreamFileReader(ArvadosFileReaderBase):
@@ -279,6 +289,7 @@ class _BufferBlock(object):
     PENDING = 1
     COMMITTED = 2
     ERROR = 3
+    DELETED = 4
 
     def __init__(self, blockid, starting_capacity, owner):
         """
@@ -373,10 +384,54 @@ class _BufferBlock(object):
 
     @synchronized
     def clear(self):
+        self._state = _BufferBlock.DELETED
         self.owner = None
         self.buffer_block = None
         self.buffer_view = None
 
+    @synchronized
+    def repack_writes(self):
+        """Optimize buffer block by repacking segments in file sequence.
+
+        When the client makes random writes, they appear in the buffer block in
+        the sequence they were written rather than the sequence they appear in
+        the file.  This makes for inefficient, fragmented manifests.  Attempt
+        to optimize by repacking writes in file sequence.
+
+        """
+        if self._state != _BufferBlock.WRITABLE:
+            raise AssertionError("Cannot repack non-writable block")
+
+        segs = self.owner.segments()
+
+        # Collect the segments that reference the buffer block.
+        bufferblock_segs = [s for s in segs if s.locator == self.blockid]
+
+        # Collect total data referenced by segments (could be smaller than
+        # bufferblock size if a portion of the file was written and
+        # then overwritten).
+        write_total = sum([s.range_size for s in bufferblock_segs])
+
+        if write_total < self.size() or len(bufferblock_segs) > 1:
+            # If there's more than one segment referencing this block, it is
+            # due to out-of-order writes and will produce a fragmented
+            # manifest, so try to optimize by re-packing into a new buffer.
+            contents = self.buffer_view[0:self.write_pointer].tobytes()
+            new_bb = _BufferBlock(None, write_total, None)
+            for t in bufferblock_segs:
+                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
+                t.segment_offset = new_bb.size() - t.range_size
+
+            self.buffer_block = new_bb.buffer_block
+            self.buffer_view = new_bb.buffer_view
+            self.write_pointer = new_bb.write_pointer
+            self._locator = None
+            new_bb.clear()
+            self.owner.set_segments(segs)
+
+    def __repr__(self):
+        return "<BufferBlock %s>" % (self.blockid)
+
 
 class NoopLock(object):
     def __enter__(self):
@@ -450,7 +505,7 @@ class _BlockManager(object):
 
     def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         if blockid is None:
-            blockid = "%s" % uuid.uuid4()
+            blockid = str(uuid.uuid4())
         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -466,7 +521,7 @@ class _BlockManager(object):
           ArvadosFile that owns the new block
 
         """
-        new_blockid = "bufferblock%i" % len(self._bufferblocks)
+        new_blockid = str(uuid.uuid4())
         bufferblock = block.clone(new_blockid, owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -571,43 +626,56 @@ class _BlockManager(object):
     @synchronized
     def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
+
         self._pending_write_size += closed_file_size
 
         # Check if there are enough small blocks for filling up one in full
-        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+        if not (force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE)):
+            return
 
-            # Search blocks ready for getting packed together before being committed to Keep.
-            # A WRITABLE block always has an owner.
-            # A WRITABLE block with its owner.closed() implies that it's
-            # size is <= KEEP_BLOCK_SIZE/2.
-            try:
-                small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-            except AttributeError:
-                # Writable blocks without owner shouldn't exist.
-                raise UnownedBlockError()
+        # Search blocks ready for getting packed together before being committed to Keep.
+        # A WRITABLE block always has an owner.
+        # A WRITABLE block with its owner.closed() implies that it's
+        # size is <= KEEP_BLOCK_SIZE/2.
+        try:
+            small_blocks = [b for b in self._bufferblocks.values()
+                            if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+        except AttributeError:
+            # Writable blocks without owner shouldn't exist.
+            raise UnownedBlockError()
+
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
+            return
 
-            if len(small_blocks) <= 1:
-                # Not enough small blocks for repacking
-                return
+        for bb in small_blocks:
+            bb.repack_writes()
 
-            # Update the pending write size count with its true value, just in case
-            # some small file was opened, written and closed several times.
-            self._pending_write_size = sum([b.size() for b in small_blocks])
-            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
-                return
+        # Update the pending write size count with its true value, just in case
+        # some small file was opened, written and closed several times.
+        self._pending_write_size = sum([b.size() for b in small_blocks])
 
-            new_bb = self._alloc_bufferblock()
-            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
-                bb = small_blocks.pop(0)
-                arvfile = bb.owner
-                self._pending_write_size -= bb.size()
-                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                arvfile.set_segments([Range(new_bb.blockid,
-                                            0,
-                                            bb.size(),
-                                            new_bb.write_pointer - bb.size())])
-                self._delete_bufferblock(bb.blockid)
-            self.commit_bufferblock(new_bb, sync=sync)
+        if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+            return
+
+        new_bb = self._alloc_bufferblock()
+        files = []
+        while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+            bb = small_blocks.pop(0)
+            self._pending_write_size -= bb.size()
+            new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+            files.append((bb, new_bb.write_pointer - bb.size()))
+
+        self.commit_bufferblock(new_bb, sync=sync)
+
+        for bb, new_bb_segment_offset in files:
+            newsegs = bb.owner.segments()
+            for s in newsegs:
+                if s.locator == bb.blockid:
+                    s.locator = new_bb.locator()
+                    s.segment_offset = new_bb_segment_offset+s.segment_offset
+            bb.owner.set_segments(newsegs)
+            self._delete_bufferblock(bb.blockid)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -659,7 +727,12 @@ class _BlockManager(object):
     @synchronized
     def get_padding_block(self):
         """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
-        when using truncate() to extend the size of a file."""
+        when using truncate() to extend the size of a file.
+
+        For reference (and possible future optimization), the md5sum of the
+        padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
+
+        """
 
         if self.padding_block is None:
             self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
@@ -984,37 +1057,6 @@ class ArvadosFile(object):
 
         return ''.join(data)
 
-    def _repack_writes(self, num_retries):
-        """Optimize buffer block by repacking segments in file sequence.
-
-        When the client makes random writes, they appear in the buffer block in
-        the sequence they were written rather than the sequence they appear in
-        the file.  This makes for inefficient, fragmented manifests.  Attempt
-        to optimize by repacking writes in file sequence.
-
-        """
-        segs = self._segments
-
-        # Collect the segments that reference the buffer block.
-        bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
-
-        if len(bufferblock_segs) > 1:
-            # Collect total data referenced by segments (could be smaller than
-            # bufferblock size if a portion of the file was written and
-            # then overwritten).
-            write_total = sum([s.range_size for s in bufferblock_segs])
-
-            # If there's more than one segment referencing this block, it is
-            # due to out-of-order writes and will produce a fragmented
-            # manifest, so try to optimize by re-packing into a new buffer.
-            contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
-            new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
-            for t in bufferblock_segs:
-                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
-                t.segment_offset = new_bb.size() - t.range_size
-
-            self._current_bblock = new_bb
-
     @must_be_writable
     @synchronized
     def writeto(self, offset, data, num_retries):
@@ -1045,7 +1087,7 @@ class ArvadosFile(object):
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-            self._repack_writes(num_retries)
+            self._current_bblock.repack_writes()
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
                 self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -1072,8 +1114,9 @@ class ArvadosFile(object):
 
         if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
             if self._current_bblock.state() == _BufferBlock.WRITABLE:
-                self._repack_writes(num_retries)
-            self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
+                self._current_bblock.repack_writes()
+            if self._current_bblock.state() != _BufferBlock.DELETED:
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
 
         if sync:
             to_delete = set()
@@ -1122,17 +1165,17 @@ class ArvadosFile(object):
                       normalize=False, only_committed=False):
         buf = ""
         filestream = []
-        for segment in self.segments:
+        for segment in self._segments:
             loc = segment.locator
             if self.parent._my_block_manager().is_bufferblock(loc):
                 if only_committed:
                     continue
-                loc = self._bufferblocks[loc].calculate_locator()
+                loc = self.parent._my_block_manager().get_bufferblock(loc).locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
-            filestream.append(LocatorAndRange(loc, locator_block_size(loc),
+            filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
                                  segment.segment_offset, segment.range_size))
-        buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
+        buf += ' '.join(normalize_stream(stream_name, {self.name: filestream}))
         buf += "\n"
         return buf
 
@@ -1212,6 +1255,9 @@ class ArvadosFileWriter(ArvadosFileReader):
         self.mode = mode
         self.arvadosfile.add_writer(self)
 
+    def writable(self):
+        return True
+
     @_FileLikeObjectBase._before_close
     @retry_method
     def write(self, data, num_retries=None):
@@ -1233,8 +1279,6 @@ class ArvadosFileWriter(ArvadosFileReader):
         if size is None:
             size = self._filepos
         self.arvadosfile.truncate(size)
-        if self._filepos > self.size():
-            self._filepos = self.size()
 
     @_FileLikeObjectBase._before_close
     def flush(self):