Merge branch '3198-inode-cache' into 3198-writable-fuse, fix tests.
[arvados.git] / sdk / python / arvados / arvfile.py
index b4102f4590464088a5159a1565fe4b4aa32b32c1..c0ef5810728e42776d4b57c4f4e6da0b03dd4618 100644 (file)
@@ -441,11 +441,17 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self._prefetch_queue = None
 
-    def commit_bufferblock(self, block):
+    def commit_bufferblock(self, block, wait):
         """Initiate a background upload of a bufferblock.
 
-        This will block if the upload queue is at capacity, otherwise it will
-        return immediately.
+        :block:
+          The block object to upload
+
+        :wait:
+          If `wait` is True, upload the block synchronously.
+          If `wait` is False, upload the block asynchronously.  This will
+          return immediately unless if the upload queue is at capacity, in
+          which case it will wait on an upload queue slot.
 
         """
 
@@ -457,6 +463,7 @@ class _BlockManager(object):
                     bufferblock = self._put_queue.get()
                     if bufferblock is None:
                         return
+
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
@@ -466,32 +473,40 @@ class _BlockManager(object):
                     if self._put_queue is not None:
                         self._put_queue.task_done()
 
-        with self.lock:
-            if self._put_threads is None:
-                # Start uploader threads.
-
-                # If we don't limit the Queue size, the upload queue can quickly
-                # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
-                # servers.
-                #
-                # With two upload threads and a queue size of 2, this means up to 4
-                # blocks pending.  If they are full 64 MiB blocks, that means up to
-                # 256 MiB of internal buffering, which is the same size as the
-                # default download block cache in KeepClient.
-                self._put_queue = Queue.Queue(maxsize=2)
-                self._put_errors = Queue.Queue()
-
-                self._put_threads = []
-                for i in xrange(0, self.num_put_threads):
-                    thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
-                    self._put_threads.append(thread)
-                    thread.daemon = True
-                    thread.start()
+        if block.state() != _BufferBlock.WRITABLE:
+            return
 
-        # Mark the block as PENDING so to disallow any more appends.
-        block.set_state(_BufferBlock.PENDING)
-        self._put_queue.put(block)
+        if wait:
+            block.set_state(_BufferBlock.PENDING)
+            loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+            block.set_state(_BufferBlock.COMMITTED, loc)
+        else:
+            with self.lock:
+                if self._put_threads is None:
+                    # Start uploader threads.
+
+                    # If we don't limit the Queue size, the upload queue can quickly
+                    # grow to take up gigabytes of RAM if the writing process is
+                    # generating data more quickly than it can be send to the Keep
+                    # servers.
+                    #
+                    # With two upload threads and a queue size of 2, this means up to 4
+                    # blocks pending.  If they are full 64 MiB blocks, that means up to
+                    # 256 MiB of internal buffering, which is the same size as the
+                    # default download block cache in KeepClient.
+                    self._put_queue = Queue.Queue(maxsize=2)
+                    self._put_errors = Queue.Queue()
+
+                    self._put_threads = []
+                    for i in xrange(0, self.num_put_threads):
+                        thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+                        self._put_threads.append(thread)
+                        thread.daemon = True
+                        thread.start()
+
+            # Mark the block as PENDING so to disallow any more appends.
+            block.set_state(_BufferBlock.PENDING)
+            self._put_queue.put(block)
 
     @synchronized
     def get_bufferblock(self, locator):
@@ -529,7 +544,7 @@ class _BlockManager(object):
 
         for k,v in items:
             if v.state() == _BufferBlock.WRITABLE:
-                self.commit_bufferblock(v)
+                v.owner.flush(False)
 
         with self.lock:
             if self._put_queue is not None:
@@ -604,13 +619,13 @@ class ArvadosFile(object):
           a list of Range objects representing segments
         """
         self.parent = parent
+        self.name = name
         self._modified = True
         self._segments = []
         self.lock = parent.root_collection().lock
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
-        self.name = name
 
     def writable(self):
         return self.parent.writable()
@@ -721,8 +736,14 @@ class ArvadosFile(object):
         elif size > self.size():
             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
 
-    def readfrom(self, offset, size, num_retries):
-        """Read upto `size` bytes from the file starting at `offset`."""
+    def readfrom(self, offset, size, num_retries, exact=False):
+        """Read upto `size` bytes from the file starting at `offset`.
+
+        :exact:
+         If False (default), return less data than requested if the read
+         crosses a block boundary and the next block isn't cached.  If True,
+         only return less data than requested when hitting EOF.
+        """
 
         with self.lock:
             if size == 0 or offset >= self.size():
@@ -735,14 +756,14 @@ class ArvadosFile(object):
 
         data = []
         for lr in readsegs:
-            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=bool(data))
+            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
                 data.append(block[lr.segment_offset:lr.segment_offset+lr.segment_size])
             else:
                 break
         return ''.join(data)
 
-    def _repack_writes(self):
+    def _repack_writes(self, num_retries):
         """Test if the buffer block has more data than actual segments.
 
         This happens when a buffered write over-writes a file range written in
@@ -760,9 +781,10 @@ class ArvadosFile(object):
         if write_total < self._current_bblock.size():
             # There is more data in the buffer block than is actually accounted for by segments, so
             # re-pack into a new buffer by copying over to a new buffer block.
+            contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
             for t in bufferblock_segs:
-                new_bb.append(self._current_bblock.buffer_view[t.segment_offset:t.segment_offset+t.range_size].tobytes())
+                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
                 t.segment_offset = new_bb.size() - t.range_size
 
             self._current_bblock = new_bb
@@ -791,9 +813,9 @@ class ArvadosFile(object):
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-            self._repack_writes()
+            self._repack_writes(num_retries)
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         self._current_bblock.append(data)
@@ -805,11 +827,11 @@ class ArvadosFile(object):
         return len(data)
 
     @synchronized
-    def flush(self):
+    def flush(self, wait=True, num_retries=0):
         if self.modified():
             if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
-                self._repack_writes()
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+                self._repack_writes(num_retries)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
             self.parent.notify(MOD, self.parent, self.name, (self, self))
 
     @must_be_writable
@@ -856,6 +878,16 @@ class ArvadosFile(object):
         buf += "\n"
         return buf
 
+    @must_be_writable
+    @synchronized
+    def reparent(self, newparent, newname):
+        self.flush()
+        self.parent.remove(self.name)
+
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+        self._modified = True
 
 class ArvadosFileReader(ArvadosFileReaderBase):
     """Wraps ArvadosFile in a file-like object supporting reading only.