9701: Use a collection.OrderedDict instead of a simple dict to hold bufferblocks...
[arvados.git] / sdk / python / arvados / arvfile.py
index 5befa193d3af91c9bd20a25d284c105729281900..85366d2fdcbc0c5c120c8fb38c036f4436f1754a 100644 (file)
@@ -10,6 +10,7 @@ import copy
 import errno
 import re
 import logging
 import errno
 import re
 import logging
+import collections
 
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
 
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
@@ -108,6 +109,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         cache_pos, cache_data = self._readline_cache
         if self.tell() == cache_pos:
             data = [cache_data]
         cache_pos, cache_data = self._readline_cache
         if self.tell() == cache_pos:
             data = [cache_data]
+            self._filepos += len(cache_data)
         else:
             data = ['']
         data_size = len(data[-1])
         else:
             data = ['']
         data_size = len(data[-1])
@@ -123,13 +125,14 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         except ValueError:
             nextline_index = len(data)
         nextline_index = min(nextline_index, size)
         except ValueError:
             nextline_index = len(data)
         nextline_index = min(nextline_index, size)
+        self._filepos -= len(data) - nextline_index
         self._readline_cache = (self.tell(), data[nextline_index:])
         return data[:nextline_index]
 
     @_FileLikeObjectBase._before_close
     @retry_method
     def decompress(self, decompress, size, num_retries=None):
         self._readline_cache = (self.tell(), data[nextline_index:])
         return data[:nextline_index]
 
     @_FileLikeObjectBase._before_close
     @retry_method
     def decompress(self, decompress, size, num_retries=None):
-        for segment in self.readall(size, num_retries):
+        for segment in self.readall(size, num_retries=num_retries):
             data = decompress(segment)
             if data:
                 yield data
             data = decompress(segment)
             if data:
                 yield data
@@ -311,28 +314,30 @@ class _BufferBlock(object):
         else:
             raise AssertionError("Buffer block is not writable")
 
         else:
             raise AssertionError("Buffer block is not writable")
 
+    STATE_TRANSITIONS = frozenset([
+            (WRITABLE, PENDING),
+            (PENDING, COMMITTED),
+            (PENDING, ERROR),
+            (ERROR, PENDING)])
+
     @synchronized
     def set_state(self, nextstate, val=None):
     @synchronized
     def set_state(self, nextstate, val=None):
-        if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
-            (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED) or
-            (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.ERROR) or
-            (self._state == _BufferBlock.ERROR and nextstate == _BufferBlock.PENDING)):
-            self._state = nextstate
-
-            if self._state == _BufferBlock.PENDING:
-                self.wait_for_commit.clear()
-
-            if self._state == _BufferBlock.COMMITTED:
-                self._locator = val
-                self.buffer_view = None
-                self.buffer_block = None
-                self.wait_for_commit.set()
-
-            if self._state == _BufferBlock.ERROR:
-                self.error = val
-                self.wait_for_commit.set()
-        else:
-            raise StateChangeError("Invalid state change from %s to %s" % (self.state, nextstate), self.state, nextstate)
+        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
+            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
+        self._state = nextstate
+
+        if self._state == _BufferBlock.PENDING:
+            self.wait_for_commit.clear()
+
+        if self._state == _BufferBlock.COMMITTED:
+            self._locator = val
+            self.buffer_view = None
+            self.buffer_block = None
+            self.wait_for_commit.set()
+
+        if self._state == _BufferBlock.ERROR:
+            self.error = val
+            self.wait_for_commit.set()
 
     @synchronized
     def state(self):
 
     @synchronized
     def state(self):
@@ -398,10 +403,10 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep):
+    def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         """keep: KeepClient object to use"""
         self._keep = keep
-        self._bufferblocks = {}
+        self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
         self._prefetch_queue = None
         self._put_queue = None
         self._put_threads = None
         self._prefetch_queue = None
@@ -410,6 +415,7 @@ class _BlockManager(object):
         self.prefetch_enabled = True
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.prefetch_enabled = True
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.copies = copies
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -460,7 +466,10 @@ class _BlockManager(object):
                 if bufferblock is None:
                     return
 
                 if bufferblock is None:
                     return
 
-                loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
             except Exception as e:
@@ -489,7 +498,7 @@ class _BlockManager(object):
             for i in xrange(0, self.num_put_threads):
                 thread = threading.Thread(target=self._commit_bufferblock_worker)
                 self._put_threads.append(thread)
             for i in xrange(0, self.num_put_threads):
                 thread = threading.Thread(target=self._commit_bufferblock_worker)
                 self._put_threads.append(thread)
-                thread.daemon = False
+                thread.daemon = True
                 thread.start()
 
     def _block_prefetch_worker(self):
                 thread.start()
 
     def _block_prefetch_worker(self):
@@ -541,8 +550,30 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
-    def __del__(self):
-        self.stop_threads()
+    @synchronized
+    def repack_small_blocks(self, force=False, sync=False):
+        """Packs small blocks together before uploading"""
+        # Search blocks ready for getting packed together before being committed to Keep.
+        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
+            return
+
+        # Check if there are enough small blocks for filling up one in full
+        pending_write_size = sum([b.size() for b in small_blocks])
+        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+            new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
+            self._bufferblocks[new_bb.blockid] = new_bb
+            size = 0
+            while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+                bb = small_blocks.pop(0)
+                size += bb.size()
+                arvfile = bb.owner
+                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+                arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
+                bb.clear()
+                del self._bufferblocks[bb.blockid]
+            self.commit_bufferblock(new_bb, sync=sync)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -553,24 +584,32 @@ class _BlockManager(object):
         :sync:
           If `sync` is True, upload the block synchronously.
           If `sync` is False, upload the block asynchronously.  This will
         :sync:
           If `sync` is True, upload the block synchronously.
           If `sync` is False, upload the block asynchronously.  This will
-          return immediately unless if the upload queue is at capacity, in
+          return immediately unless the upload queue is at capacity, in
           which case it will wait on an upload queue slot.
 
         """
           which case it will wait on an upload queue slot.
 
         """
-
         try:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
         except StateChangeError as e:
         try:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
         except StateChangeError as e:
-            if e.state == _BufferBlock.PENDING and sync:
-                block.wait_for_commit.wait()
-                if block.state() == _BufferBlock.ERROR:
-                    raise block.error
-            return
+            if e.state == _BufferBlock.PENDING:
+                if sync:
+                    block.wait_for_commit.wait()
+                else:
+                    return
+            if block.state() == _BufferBlock.COMMITTED:
+                return
+            elif block.state() == _BufferBlock.ERROR:
+                raise block.error
+            else:
+                raise
 
         if sync:
             try:
 
         if sync:
             try:
-                loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
@@ -615,11 +654,13 @@ class _BlockManager(object):
         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 
         """
         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 
         """
+        self.repack_small_blocks(force=True, sync=True)
+
         with self.lock:
             items = self._bufferblocks.items()
 
         for k,v in items:
         with self.lock:
             items = self._bufferblocks.items()
 
         for k,v in items:
-            if v.state() != _BufferBlock.COMMITTED:
+            if v.state() != _BufferBlock.COMMITTED and v.owner:
                 v.owner.flush(sync=False)
 
         with self.lock:
                 v.owner.flush(sync=False)
 
         with self.lock:
@@ -639,7 +680,6 @@ class _BlockManager(object):
             if v.owner:
                 v.owner.flush(sync=True)
 
             if v.owner:
                 v.owner.flush(sync=True)
 
-
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
 
@@ -653,9 +693,13 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
         if not self.prefetch_enabled:
             return
 
+        if self._keep.get_from_cache(locator) is not None:
+            return
+
         with self.lock:
             if locator in self._bufferblocks:
                 return
         with self.lock:
             if locator in self._bufferblocks:
                 return
+
         self.start_get_threads()
         self._prefetch_queue.put(locator)
 
         self.start_get_threads()
         self._prefetch_queue.put(locator)
 
@@ -683,6 +727,7 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
         """
         self.parent = parent
         self.name = name
+        self._writers = set()
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -759,9 +804,13 @@ class ArvadosFile(object):
     def __ne__(self, other):
         return not self.__eq__(other)
 
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def set_segments(self, segs):
+        self._segments = segs
+
     @synchronized
     def set_committed(self):
     @synchronized
     def set_committed(self):
-        """Set committed flag to False"""
+        """Set committed flag to True"""
         self._committed = True
 
     @synchronized
         self._committed = True
 
     @synchronized
@@ -769,6 +818,34 @@ class ArvadosFile(object):
         """Get whether this is committed or not."""
         return self._committed
 
         """Get whether this is committed or not."""
         return self._committed
 
+    @synchronized
+    def add_writer(self, writer):
+        """Add an ArvadosFileWriter reference to the list of writers"""
+        if isinstance(writer, ArvadosFileWriter):
+            self._writers.add(writer)
+
+    @synchronized
+    def remove_writer(self, writer):
+        """
+        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+        and do some block maintenance tasks.
+        """
+        self._writers.remove(writer)
+
+        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+            # File writer closed, not small enough for repacking
+            self.flush()
+        elif self.closed():
+            # All writers closed and size is adequate for repacking
+            self.parent._my_block_manager().repack_small_blocks()
+
+    def closed(self):
+        """
+        Get whether this is closed or not. When the writers list is empty, the file
+        is supposed to be closed.
+        """
+        return len(self._writers) == 0
+
     @must_be_writable
     @synchronized
     def truncate(self, size):
     @must_be_writable
     @synchronized
     def truncate(self, size):
@@ -811,19 +888,25 @@ class ArvadosFile(object):
         with self.lock:
             if size == 0 or offset >= self.size():
                 return ''
         with self.lock:
             if size == 0 or offset >= self.size():
                 return ''
-            prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
             readsegs = locators_and_ranges(self._segments, offset, size)
             readsegs = locators_and_ranges(self._segments, offset, size)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
 
 
-        for lr in prefetch:
-            self.parent._my_block_manager().block_prefetch(lr.locator)
-
+        locs = set()
         data = []
         for lr in readsegs:
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
         data = []
         for lr in readsegs:
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
-                data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
+                blockview = memoryview(block)
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                locs.add(lr.locator)
             else:
                 break
             else:
                 break
+
+        for lr in prefetch:
+            if lr.locator not in locs:
+                self.parent._my_block_manager().block_prefetch(lr.locator)
+                locs.add(lr.locator)
+
         return ''.join(data)
 
     def _repack_writes(self, num_retries):
         return ''.join(data)
 
     def _repack_writes(self, num_retries):
@@ -918,7 +1001,7 @@ class ArvadosFile(object):
                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
                 if bb:
                     if bb.state() != _BufferBlock.COMMITTED:
                 bb = self.parent._my_block_manager().get_bufferblock(s.locator)
                 if bb:
                     if bb.state() != _BufferBlock.COMMITTED:
-                        self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=True)
+                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
                     to_delete.add(s.locator)
                     s.locator = bb.locator()
             for s in to_delete:
                     to_delete.add(s.locator)
                     s.locator = bb.locator()
             for s in to_delete:
@@ -1044,6 +1127,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
         self.mode = mode
     def __init__(self, arvadosfile, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
         self.mode = mode
+        self.arvadosfile.add_writer(self)
 
     @_FileLikeObjectBase._before_close
     @retry_method
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -1059,7 +1143,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     @retry_method
     def writelines(self, seq, num_retries=None):
         for s in seq:
     @retry_method
     def writelines(self, seq, num_retries=None):
         for s in seq:
-            self.write(s, num_retries)
+            self.write(s, num_retries=num_retries)
 
     @_FileLikeObjectBase._before_close
     def truncate(self, size=None):
 
     @_FileLikeObjectBase._before_close
     def truncate(self, size=None):
@@ -1075,5 +1159,5 @@ class ArvadosFileWriter(ArvadosFileReader):
 
     def close(self):
         if not self.closed:
 
     def close(self):
         if not self.closed:
-            self.flush()
+            self.arvadosfile.remove_writer(self)
             super(ArvadosFileWriter, self).close()
             super(ArvadosFileWriter, self).close()