9701: Use a collection.OrderedDict instead of a simple dict to hold bufferblocks...
[arvados.git] / sdk / python / arvados / arvfile.py
index 792c81f1e6868e678233ef2508659d3fec621e96..85366d2fdcbc0c5c120c8fb38c036f4436f1754a 100644 (file)
@@ -10,8 +10,9 @@ import copy
 import errno
 import re
 import logging
 import errno
 import re
 import logging
+import collections
 
 
-from .errors import KeepWriteError, AssertionError
+from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
 from ._normalize_stream import normalize_stream
 from ._ranges import locators_and_ranges, replace_range, Range
 from .keep import KeepLocator
 from ._normalize_stream import normalize_stream
 from ._ranges import locators_and_ranges, replace_range, Range
@@ -108,6 +109,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         cache_pos, cache_data = self._readline_cache
         if self.tell() == cache_pos:
             data = [cache_data]
         cache_pos, cache_data = self._readline_cache
         if self.tell() == cache_pos:
             data = [cache_data]
+            self._filepos += len(cache_data)
         else:
             data = ['']
         data_size = len(data[-1])
         else:
             data = ['']
         data_size = len(data[-1])
@@ -123,13 +125,14 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         except ValueError:
             nextline_index = len(data)
         nextline_index = min(nextline_index, size)
         except ValueError:
             nextline_index = len(data)
         nextline_index = min(nextline_index, size)
+        self._filepos -= len(data) - nextline_index
         self._readline_cache = (self.tell(), data[nextline_index:])
         return data[:nextline_index]
 
     @_FileLikeObjectBase._before_close
     @retry_method
     def decompress(self, decompress, size, num_retries=None):
         self._readline_cache = (self.tell(), data[nextline_index:])
         return data[:nextline_index]
 
     @_FileLikeObjectBase._before_close
     @retry_method
     def decompress(self, decompress, size, num_retries=None):
-        for segment in self.readall(size, num_retries):
+        for segment in self.readall(size, num_retries=num_retries):
             data = decompress(segment)
             if data:
                 yield data
             data = decompress(segment)
             if data:
                 yield data
@@ -236,6 +239,13 @@ def synchronized(orig_func):
             return orig_func(self, *args, **kwargs)
     return synchronized_wrapper
 
             return orig_func(self, *args, **kwargs)
     return synchronized_wrapper
 
+
+class StateChangeError(Exception):
+    def __init__(self, message, state, nextstate):
+        super(StateChangeError, self).__init__(message)
+        self.state = state
+        self.nextstate = nextstate
+
 class _BufferBlock(object):
     """A stand-in for a Keep block that is in the process of being written.
 
 class _BufferBlock(object):
     """A stand-in for a Keep block that is in the process of being written.
 
@@ -259,6 +269,7 @@ class _BufferBlock(object):
     WRITABLE = 0
     PENDING = 1
     COMMITTED = 2
     WRITABLE = 0
     PENDING = 1
     COMMITTED = 2
+    ERROR = 3
 
     def __init__(self, blockid, starting_capacity, owner):
         """
 
     def __init__(self, blockid, starting_capacity, owner):
         """
@@ -280,6 +291,8 @@ class _BufferBlock(object):
         self._locator = None
         self.owner = owner
         self.lock = threading.Lock()
         self._locator = None
         self.owner = owner
         self.lock = threading.Lock()
+        self.wait_for_commit = threading.Event()
+        self.error = None
 
     @synchronized
     def append(self, data):
 
     @synchronized
     def append(self, data):
@@ -301,17 +314,30 @@ class _BufferBlock(object):
         else:
             raise AssertionError("Buffer block is not writable")
 
         else:
             raise AssertionError("Buffer block is not writable")
 
+    STATE_TRANSITIONS = frozenset([
+            (WRITABLE, PENDING),
+            (PENDING, COMMITTED),
+            (PENDING, ERROR),
+            (ERROR, PENDING)])
+
     @synchronized
     @synchronized
-    def set_state(self, nextstate, loc=None):
-        if ((self._state == _BufferBlock.WRITABLE and nextstate == _BufferBlock.PENDING) or
-            (self._state == _BufferBlock.PENDING and nextstate == _BufferBlock.COMMITTED)):
-            self._state = nextstate
-            if self._state == _BufferBlock.COMMITTED:
-                self._locator = loc
-                self.buffer_view = None
-                self.buffer_block = None
-        else:
-            raise AssertionError("Invalid state change from %s to %s" % (self.state, nextstate))
+    def set_state(self, nextstate, val=None):
+        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
+            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
+        self._state = nextstate
+
+        if self._state == _BufferBlock.PENDING:
+            self.wait_for_commit.clear()
+
+        if self._state == _BufferBlock.COMMITTED:
+            self._locator = val
+            self.buffer_view = None
+            self.buffer_block = None
+            self.wait_for_commit.set()
+
+        if self._state == _BufferBlock.ERROR:
+            self.error = val
+            self.wait_for_commit.set()
 
     @synchronized
     def state(self):
 
     @synchronized
     def state(self):
@@ -331,7 +357,7 @@ class _BufferBlock(object):
     @synchronized
     def clone(self, new_blockid, owner):
         if self._state == _BufferBlock.COMMITTED:
     @synchronized
     def clone(self, new_blockid, owner):
         if self._state == _BufferBlock.COMMITTED:
-            raise AssertionError("Can only duplicate a writable or pending buffer block")
+            raise AssertionError("Cannot duplicate committed buffer block")
         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
         bufferblock.append(self.buffer_view[0:self.size()])
         return bufferblock
         bufferblock = _BufferBlock(new_blockid, self.size(), owner)
         bufferblock.append(self.buffer_view[0:self.size()])
         return bufferblock
@@ -361,7 +387,7 @@ def must_be_writable(orig_func):
     @functools.wraps(orig_func)
     def must_be_writable_wrapper(self, *args, **kwargs):
         if not self.writable():
     @functools.wraps(orig_func)
     def must_be_writable_wrapper(self, *args, **kwargs):
         if not self.writable():
-            raise IOError(errno.EROFS, "Collection must be writable.")
+            raise IOError(errno.EROFS, "Collection is read-only.")
         return orig_func(self, *args, **kwargs)
     return must_be_writable_wrapper
 
         return orig_func(self, *args, **kwargs)
     return must_be_writable_wrapper
 
@@ -373,19 +399,23 @@ class _BlockManager(object):
     Collection of ArvadosFiles.
 
     """
     Collection of ArvadosFiles.
 
     """
-    def __init__(self, keep):
+
+    DEFAULT_PUT_THREADS = 2
+    DEFAULT_GET_THREADS = 2
+
+    def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         """keep: KeepClient object to use"""
         self._keep = keep
-        self._bufferblocks = {}
+        self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_queue = None
-        self._put_errors = None
         self._put_threads = None
         self._prefetch_queue = None
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self._put_threads = None
         self._prefetch_queue = None
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        self.num_put_threads = 2
-        self.num_get_threads = 2
+        self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.copies = copies
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -427,6 +457,73 @@ class _BlockManager(object):
     def is_bufferblock(self, locator):
         return locator in self._bufferblocks
 
     def is_bufferblock(self, locator):
         return locator in self._bufferblocks
 
+    def _commit_bufferblock_worker(self):
+        """Background uploader thread."""
+
+        while True:
+            try:
+                bufferblock = self._put_queue.get()
+                if bufferblock is None:
+                    return
+
+                if self.copies is None:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+                bufferblock.set_state(_BufferBlock.COMMITTED, loc)
+
+            except Exception as e:
+                bufferblock.set_state(_BufferBlock.ERROR, e)
+            finally:
+                if self._put_queue is not None:
+                    self._put_queue.task_done()
+
+    @synchronized
+    def start_put_threads(self):
+        if self._put_threads is None:
+            # Start uploader threads.
+
+            # If we don't limit the Queue size, the upload queue can quickly
+            # grow to take up gigabytes of RAM if the writing process is
+            # generating data more quickly than it can be send to the Keep
+            # servers.
+            #
+            # With two upload threads and a queue size of 2, this means up to 4
+            # blocks pending.  If they are full 64 MiB blocks, that means up to
+            # 256 MiB of internal buffering, which is the same size as the
+            # default download block cache in KeepClient.
+            self._put_queue = Queue.Queue(maxsize=2)
+
+            self._put_threads = []
+            for i in xrange(0, self.num_put_threads):
+                thread = threading.Thread(target=self._commit_bufferblock_worker)
+                self._put_threads.append(thread)
+                thread.daemon = True
+                thread.start()
+
+    def _block_prefetch_worker(self):
+        """The background downloader thread."""
+        while True:
+            try:
+                b = self._prefetch_queue.get()
+                if b is None:
+                    return
+                self._keep.get(b)
+            except Exception:
+                pass
+
+    @synchronized
+    def start_get_threads(self):
+        if self._prefetch_threads is None:
+            self._prefetch_queue = Queue.Queue()
+            self._prefetch_threads = []
+            for i in xrange(0, self.num_get_threads):
+                thread = threading.Thread(target=self._block_prefetch_worker)
+                self._prefetch_threads.append(thread)
+                thread.daemon = True
+                thread.start()
+
+
     @synchronized
     def stop_threads(self):
         """Shut down and wait for background upload and download threads to finish."""
     @synchronized
     def stop_threads(self):
         """Shut down and wait for background upload and download threads to finish."""
@@ -438,7 +535,6 @@ class _BlockManager(object):
                 t.join()
         self._put_threads = None
         self._put_queue = None
                 t.join()
         self._put_threads = None
         self._put_queue = None
-        self._put_errors = None
 
         if self._prefetch_threads is not None:
             for t in self._prefetch_threads:
 
         if self._prefetch_threads is not None:
             for t in self._prefetch_threads:
@@ -448,71 +544,78 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self._prefetch_queue = None
 
         self._prefetch_threads = None
         self._prefetch_queue = None
 
-    def commit_bufferblock(self, block, wait):
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.stop_threads()
+
+    @synchronized
+    def repack_small_blocks(self, force=False, sync=False):
+        """Packs small blocks together before uploading"""
+        # Search blocks ready for getting packed together before being committed to Keep.
+        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+        if len(small_blocks) <= 1:
+            # Not enough small blocks for repacking
+            return
+
+        # Check if there are enough small blocks for filling up one in full
+        pending_write_size = sum([b.size() for b in small_blocks])
+        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+            new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
+            self._bufferblocks[new_bb.blockid] = new_bb
+            size = 0
+            while len(small_blocks) > 0 and (size + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
+                bb = small_blocks.pop(0)
+                size += bb.size()
+                arvfile = bb.owner
+                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
+                arvfile.set_segments([Range(new_bb.blockid, 0, bb.size(), size-bb.size())])
+                bb.clear()
+                del self._bufferblocks[bb.blockid]
+            self.commit_bufferblock(new_bb, sync=sync)
+
+    def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
 
         :block:
           The block object to upload
 
         """Initiate a background upload of a bufferblock.
 
         :block:
           The block object to upload
 
-        :wait:
-          If `wait` is True, upload the block synchronously.
-          If `wait` is False, upload the block asynchronously.  This will
-          return immediately unless if the upload queue is at capacity, in
+        :sync:
+          If `sync` is True, upload the block synchronously.
+          If `sync` is False, upload the block asynchronously.  This will
+          return immediately unless the upload queue is at capacity, in
           which case it will wait on an upload queue slot.
 
         """
           which case it will wait on an upload queue slot.
 
         """
-
-        def commit_bufferblock_worker(self):
-            """Background uploader thread."""
-
-            while True:
-                try:
-                    bufferblock = self._put_queue.get()
-                    if bufferblock is None:
-                        return
-
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
-                    bufferblock.set_state(_BufferBlock.COMMITTED, loc)
-
-                except Exception as e:
-                    self._put_errors.put((bufferblock.locator(), e))
-                finally:
-                    if self._put_queue is not None:
-                        self._put_queue.task_done()
-
-        if block.state() != _BufferBlock.WRITABLE:
-            return
-
-        if wait:
-            block.set_state(_BufferBlock.PENDING)
-            loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
-            block.set_state(_BufferBlock.COMMITTED, loc)
-        else:
-            with self.lock:
-                if self._put_threads is None:
-                    # Start uploader threads.
-
-                    # If we don't limit the Queue size, the upload queue can quickly
-                    # grow to take up gigabytes of RAM if the writing process is
-                    # generating data more quickly than it can be send to the Keep
-                    # servers.
-                    #
-                    # With two upload threads and a queue size of 2, this means up to 4
-                    # blocks pending.  If they are full 64 MiB blocks, that means up to
-                    # 256 MiB of internal buffering, which is the same size as the
-                    # default download block cache in KeepClient.
-                    self._put_queue = Queue.Queue(maxsize=2)
-                    self._put_errors = Queue.Queue()
-
-                    self._put_threads = []
-                    for i in xrange(0, self.num_put_threads):
-                        thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
-                        self._put_threads.append(thread)
-                        thread.daemon = True
-                        thread.start()
-
+        try:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
+        except StateChangeError as e:
+            if e.state == _BufferBlock.PENDING:
+                if sync:
+                    block.wait_for_commit.wait()
+                else:
+                    return
+            if block.state() == _BufferBlock.COMMITTED:
+                return
+            elif block.state() == _BufferBlock.ERROR:
+                raise block.error
+            else:
+                raise
+
+        if sync:
+            try:
+                if self.copies is None:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+                block.set_state(_BufferBlock.COMMITTED, loc)
+            except Exception as e:
+                block.set_state(_BufferBlock.ERROR, e)
+                raise
+        else:
+            self.start_put_threads()
             self._put_queue.put(block)
 
     @synchronized
             self._put_queue.put(block)
 
     @synchronized
@@ -547,37 +650,35 @@ class _BlockManager(object):
     def commit_all(self):
         """Commit all outstanding buffer blocks.
 
     def commit_all(self):
         """Commit all outstanding buffer blocks.
 
-        Unlike commit_bufferblock(), this is a synchronous call, and will not
-        return until all buffer blocks are uploaded.  Raises
-        KeepWriteError() if any blocks failed to upload.
+        This is a synchronous call, and will not return until all buffer blocks
+        are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 
         """
 
         """
+        self.repack_small_blocks(force=True, sync=True)
+
         with self.lock:
             items = self._bufferblocks.items()
 
         for k,v in items:
         with self.lock:
             items = self._bufferblocks.items()
 
         for k,v in items:
-            if v.state() == _BufferBlock.WRITABLE:
-                v.owner.flush(False)
+            if v.state() != _BufferBlock.COMMITTED and v.owner:
+                v.owner.flush(sync=False)
 
         with self.lock:
             if self._put_queue is not None:
                 self._put_queue.join()
 
 
         with self.lock:
             if self._put_queue is not None:
                 self._put_queue.join()
 
-                if not self._put_errors.empty():
-                    err = []
-                    try:
-                        while True:
-                            err.append(self._put_errors.get(False))
-                    except Queue.Empty:
-                        pass
+                err = []
+                for k,v in items:
+                    if v.state() == _BufferBlock.ERROR:
+                        err.append((v.locator(), v.error))
+                if err:
                     raise KeepWriteError("Error writing some blocks", err, label="block")
 
         for k,v in items:
                     raise KeepWriteError("Error writing some blocks", err, label="block")
 
         for k,v in items:
-            # flush again with wait=True to remove committed bufferblocks from
+            # flush again with sync=True to remove committed bufferblocks from
             # the segments.
             if v.owner:
             # the segments.
             if v.owner:
-                v.owner.flush(True)
-
+                v.owner.flush(sync=True)
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
 
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
@@ -592,28 +693,14 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
         if not self.prefetch_enabled:
             return
 
-        def block_prefetch_worker(self):
-            """The background downloader thread."""
-            while True:
-                try:
-                    b = self._prefetch_queue.get()
-                    if b is None:
-                        return
-                    self._keep.get(b)
-                except Exception:
-                    pass
+        if self._keep.get_from_cache(locator) is not None:
+            return
 
         with self.lock:
             if locator in self._bufferblocks:
                 return
 
         with self.lock:
             if locator in self._bufferblocks:
                 return
-            if self._prefetch_threads is None:
-                self._prefetch_queue = Queue.Queue()
-                self._prefetch_threads = []
-                for i in xrange(0, self.num_get_threads):
-                    thread = threading.Thread(target=block_prefetch_worker, args=(self,))
-                    self._prefetch_threads.append(thread)
-                    thread.daemon = True
-                    thread.start()
+
+        self.start_get_threads()
         self._prefetch_queue.put(locator)
 
 
         self._prefetch_queue.put(locator)
 
 
@@ -640,7 +727,8 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
         """
         self.parent = parent
         self.name = name
-        self._modified = True
+        self._writers = set()
+        self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
         for s in segments:
         self._segments = []
         self.lock = parent.root_collection().lock
         for s in segments:
@@ -681,7 +769,7 @@ class ArvadosFile(object):
 
             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 
 
             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 
-        self._modified = True
+        self._committed = False
 
     def __eq__(self, other):
         if other is self:
 
     def __eq__(self, other):
         if other is self:
@@ -717,14 +805,46 @@ class ArvadosFile(object):
         return not self.__eq__(other)
 
     @synchronized
         return not self.__eq__(other)
 
     @synchronized
-    def set_unmodified(self):
-        """Clear the modified flag"""
-        self._modified = False
+    def set_segments(self, segs):
+        self._segments = segs
+
+    @synchronized
+    def set_committed(self):
+        """Set committed flag to True"""
+        self._committed = True
+
+    @synchronized
+    def committed(self):
+        """Get whether this is committed or not."""
+        return self._committed
+
+    @synchronized
+    def add_writer(self, writer):
+        """Add an ArvadosFileWriter reference to the list of writers"""
+        if isinstance(writer, ArvadosFileWriter):
+            self._writers.add(writer)
 
     @synchronized
 
     @synchronized
-    def modified(self):
-        """Test the modified flag"""
-        return self._modified
+    def remove_writer(self, writer):
+        """
+        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+        and do some block maintenance tasks.
+        """
+        self._writers.remove(writer)
+
+        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+            # File writer closed, not small enough for repacking
+            self.flush()
+        elif self.closed():
+            # All writers closed and size is adequate for repacking
+            self.parent._my_block_manager().repack_small_blocks()
+
+    def closed(self):
+        """
+        Get whether this is closed or not. When the writers list is empty, the file
+        is supposed to be closed.
+        """
+        return len(self._writers) == 0
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
@@ -752,7 +872,7 @@ class ArvadosFile(object):
                     new_segs.append(r)
 
             self._segments = new_segs
                     new_segs.append(r)
 
             self._segments = new_segs
-            self._modified = True
+            self._committed = False
         elif size > self.size():
             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
 
         elif size > self.size():
             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
 
@@ -768,19 +888,25 @@ class ArvadosFile(object):
         with self.lock:
             if size == 0 or offset >= self.size():
                 return ''
         with self.lock:
             if size == 0 or offset >= self.size():
                 return ''
-            prefetch = locators_and_ranges(self._segments, offset, size + config.KEEP_BLOCK_SIZE)
             readsegs = locators_and_ranges(self._segments, offset, size)
             readsegs = locators_and_ranges(self._segments, offset, size)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
 
 
-        for lr in prefetch:
-            self.parent._my_block_manager().block_prefetch(lr.locator)
-
+        locs = set()
         data = []
         for lr in readsegs:
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
         data = []
         for lr in readsegs:
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
-                data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
+                blockview = memoryview(block)
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                locs.add(lr.locator)
             else:
                 break
             else:
                 break
+
+        for lr in prefetch:
+            if lr.locator not in locs:
+                self.parent._my_block_manager().block_prefetch(lr.locator)
+                locs.add(lr.locator)
+
         return ''.join(data)
 
     def _repack_writes(self, num_retries):
         return ''.join(data)
 
     def _repack_writes(self, num_retries):
@@ -825,9 +951,15 @@ class ArvadosFile(object):
             raise ArgumentError("Offset is past the end of the file")
 
         if len(data) > config.KEEP_BLOCK_SIZE:
             raise ArgumentError("Offset is past the end of the file")
 
         if len(data) > config.KEEP_BLOCK_SIZE:
-            raise ArgumentError("Please append data in chunks smaller than %i bytes (config.KEEP_BLOCK_SIZE)" % (config.KEEP_BLOCK_SIZE))
+            # Chunk it up into smaller writes
+            n = 0
+            dataview = memoryview(data)
+            while n < len(data):
+                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
+                n += config.KEEP_BLOCK_SIZE
+            return
 
 
-        self._modified = True
+        self._committed = False
 
         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -835,7 +967,7 @@ class ArvadosFile(object):
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
             self._repack_writes(num_retries)
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
             self._repack_writes(num_retries)
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         self._current_bblock.append(data)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         self._current_bblock.append(data)
@@ -847,25 +979,35 @@ class ArvadosFile(object):
         return len(data)
 
     @synchronized
         return len(data)
 
     @synchronized
-    def flush(self, wait=True, num_retries=0):
-        """Flush bufferblocks to Keep."""
-        if self.modified():
-            if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
-                self._repack_writes(num_retries)
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
-            if wait:
-                to_delete = set()
-                for s in self._segments:
-                    bb = self.parent._my_block_manager().get_bufferblock(s.locator)
-                    if bb:
-                        if bb.state() != _BufferBlock.COMMITTED:
-                            _logger.error("bufferblock %s is not committed" % (s.locator))
-                        else:
-                            to_delete.add(s.locator)
-                            s.locator = bb.locator()
-                for s in to_delete:
-                   self.parent._my_block_manager().delete_bufferblock(s)
+    def flush(self, sync=True, num_retries=0):
+        """Flush the current bufferblock to Keep.
+
+        :sync:
+          If True, commit block synchronously, wait until buffer block has been written.
+          If False, commit block asynchronously, return immediately after putting block into
+          the keep put queue.
+        """
+        if self.committed():
+            return
 
 
+        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
+            if self._current_bblock.state() == _BufferBlock.WRITABLE:
+                self._repack_writes(num_retries)
+            self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
+
+        if sync:
+            to_delete = set()
+            for s in self._segments:
+                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
+                if bb:
+                    if bb.state() != _BufferBlock.COMMITTED:
+                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
+                    to_delete.add(s.locator)
+                    s.locator = bb.locator()
+            for s in to_delete:
+               self.parent._my_block_manager().delete_bufferblock(s)
+
+        self.parent.notify(MOD, self.parent, self.name, (self, self))
 
     @must_be_writable
     @synchronized
 
     @must_be_writable
     @synchronized
@@ -880,7 +1022,7 @@ class ArvadosFile(object):
 
     def _add_segment(self, blocks, pos, size):
         """Internal implementation of add_segment."""
 
     def _add_segment(self, blocks, pos, size):
         """Internal implementation of add_segment."""
-        self._modified = True
+        self._committed = False
         for lr in locators_and_ranges(blocks, pos, size):
             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
         for lr in locators_and_ranges(blocks, pos, size):
             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
@@ -914,8 +1056,8 @@ class ArvadosFile(object):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
-        self._modified = True
-        self.flush()
+        self._committed = False
+        self.flush(sync=True)
         self.parent.remove(self.name)
         self.parent = newparent
         self.name = newname
         self.parent.remove(self.name)
         self.parent = newparent
         self.name = newname
@@ -930,8 +1072,8 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
     """
 
 
     """
 
-    def __init__(self, arvadosfile,  mode="r", num_retries=None):
-        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
+    def __init__(self, arvadosfile, num_retries=None):
+        super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
         self.arvadosfile = arvadosfile
 
     def size(self):
         self.arvadosfile = arvadosfile
 
     def size(self):
@@ -983,7 +1125,9 @@ class ArvadosFileWriter(ArvadosFileReader):
     """
 
     def __init__(self, arvadosfile, mode, num_retries=None):
     """
 
     def __init__(self, arvadosfile, mode, num_retries=None):
-        super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
+        super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
+        self.mode = mode
+        self.arvadosfile.add_writer(self)
 
     @_FileLikeObjectBase._before_close
     @retry_method
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -999,7 +1143,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     @retry_method
     def writelines(self, seq, num_retries=None):
         for s in seq:
     @retry_method
     def writelines(self, seq, num_retries=None):
         for s in seq:
-            self.write(s, num_retries)
+            self.write(s, num_retries=num_retries)
 
     @_FileLikeObjectBase._before_close
     def truncate(self, size=None):
 
     @_FileLikeObjectBase._before_close
     def truncate(self, size=None):
@@ -1015,5 +1159,5 @@ class ArvadosFileWriter(ArvadosFileReader):
 
     def close(self):
         if not self.closed:
 
     def close(self):
         if not self.closed:
-            self.flush()
+            self.arvadosfile.remove_writer(self)
             super(ArvadosFileWriter, self).close()
             super(ArvadosFileWriter, self).close()