7824: Moved arv-get code to arvados.commands and replaced bin/arv-get with a stub...
[arvados.git] / sdk / python / arvados / arvfile.py
index ec5e4d5f0473474bf8cc36ad197cfcccb9a82bfa..9db19b05f6bc356c2b673d4983551b2dceef7122 100644 (file)
@@ -10,6 +10,8 @@ import copy
 import errno
 import re
 import logging
+import collections
+import uuid
 
 from .errors import KeepWriteError, AssertionError, ArgumentError
 from .keep import KeepLocator
@@ -36,6 +38,12 @@ def split(path):
         stream_name, file_name = '.', path
     return stream_name, file_name
 
+
+class UnownedBlockError(Exception):
+    """Raised when there's an writable block without an owner on the BlockManager."""
+    pass
+
+
 class _FileLikeObjectBase(object):
     def __init__(self, name, mode):
         self.name = name
@@ -402,19 +410,24 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None):
+    def __init__(self, keep, copies=None, put_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
-        self._bufferblocks = {}
+        self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
         self._prefetch_queue = None
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+        if put_threads:
+            self.num_put_threads = put_threads
+        else:
+            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
+        self._pending_write_size = 0
+        self.threads_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -430,8 +443,11 @@ class _BlockManager(object):
           ArvadosFile that owns this block
 
         """
+        return self._alloc_bufferblock(blockid, starting_capacity, owner)
+
+    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
         if blockid is None:
-            blockid = "bufferblock%i" % len(self._bufferblocks)
+            blockid = "%s" % uuid.uuid4()
         bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
         self._bufferblocks[bufferblock.blockid] = bufferblock
         return bufferblock
@@ -477,28 +493,28 @@ class _BlockManager(object):
                 if self._put_queue is not None:
                     self._put_queue.task_done()
 
-    @synchronized
     def start_put_threads(self):
-        if self._put_threads is None:
-            # Start uploader threads.
-
-            # If we don't limit the Queue size, the upload queue can quickly
-            # grow to take up gigabytes of RAM if the writing process is
-            # generating data more quickly than it can be send to the Keep
-            # servers.
-            #
-            # With two upload threads and a queue size of 2, this means up to 4
-            # blocks pending.  If they are full 64 MiB blocks, that means up to
-            # 256 MiB of internal buffering, which is the same size as the
-            # default download block cache in KeepClient.
-            self._put_queue = Queue.Queue(maxsize=2)
-
-            self._put_threads = []
-            for i in xrange(0, self.num_put_threads):
-                thread = threading.Thread(target=self._commit_bufferblock_worker)
-                self._put_threads.append(thread)
-                thread.daemon = True
-                thread.start()
+        with self.threads_lock:
+            if self._put_threads is None:
+                # Start uploader threads.
+
+                # If we don't limit the Queue size, the upload queue can quickly
+                # grow to take up gigabytes of RAM if the writing process is
+                # generating data more quickly than it can be send to the Keep
+                # servers.
+                #
+                # With two upload threads and a queue size of 2, this means up to 4
+                # blocks pending.  If they are full 64 MiB blocks, that means up to
+                # 256 MiB of internal buffering, which is the same size as the
+                # default download block cache in KeepClient.
+                self._put_queue = Queue.Queue(maxsize=2)
+
+                self._put_threads = []
+                for i in xrange(0, self.num_put_threads):
+                    thread = threading.Thread(target=self._commit_bufferblock_worker)
+                    self._put_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
 
     def _block_prefetch_worker(self):
         """The background downloader thread."""
@@ -509,7 +525,7 @@ class _BlockManager(object):
                     return
                 self._keep.get(b)
             except Exception:
-                pass
+                _logger.exception("Exception doing block prefetch")
 
     @synchronized
     def start_get_threads(self):
@@ -549,35 +565,46 @@ class _BlockManager(object):
     def __exit__(self, exc_type, exc_value, traceback):
         self.stop_threads()
 
-    def repack_small_blocks(self, force=False):
+    @synchronized
+    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
-        # Candidate bblocks -- This could be sorted in some way to prioritize some
-        # kind of bblocks
-        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner and b.owner.closed() and b.owner.size() <= (config.KEEP_BLOCK_SIZE / 2)]
-        if len(small_blocks) == 0:
-            return
+        self._pending_write_size += closed_file_size
+
+        # Check if there are enough small blocks for filling up one in full
+        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
 
-        # Check if there's enough small blocks for combining and uploading
-        pending_write_size = sum([b.size() for b in small_blocks])
-        if force or (pending_write_size > (config.KEEP_BLOCK_SIZE / 2)):
-            if len(small_blocks) == 1:
-                # No small blocks for repacking, leave this one alone
-                # so it's committed before exiting.
+            # Search blocks ready for getting packed together before being committed to Keep.
+            # A WRITABLE block always has an owner.
+            # A WRITABLE block with its owner.closed() implies that it's
+            # size is <= KEEP_BLOCK_SIZE/2.
+            try:
+                small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+            except AttributeError:
+                # Writable blocks without owner shouldn't exist.
+                raise UnownedBlockError()
+
+            if len(small_blocks) <= 1:
+                # Not enough small blocks for repacking
                 return
-            new_bb = _BufferBlock("bufferblock%i" % len(self._bufferblocks), 2**14, None)
-            self._bufferblocks[new_bb.blockid] = new_bb
-            size = 0
-            while len(small_blocks) > 0 and size <= (config.KEEP_BLOCK_SIZE / 2):
+
+            # Update the pending write size count with its true value, just in case
+            # some small file was opened, written and closed several times.
+            self._pending_write_size = sum([b.size() for b in small_blocks])
+            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+                return
+
+            new_bb = self._alloc_bufferblock()
+            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
-                size += bb.size()
-                new_segs = []
+                arvfile = bb.owner
+                self._pending_write_size -= bb.size()
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
-                # FIXME: We shoudn't be accessing _segments directly
-                bb.owner._segments = [Range(new_bb.blockid, 0, bb.size(), size-bb.size())]
-                bb.clear()
-                del self._bufferblocks[bb.blockid]
-            # new_bb's size greater half a keep block, let's commit it
-            self.commit_bufferblock(new_bb, sync=True)
+                arvfile.set_segments([Range(new_bb.blockid,
+                                            0,
+                                            bb.size(),
+                                            new_bb.write_pointer - bb.size())])
+                self._delete_bufferblock(bb.blockid)
+            self.commit_bufferblock(new_bb, sync=sync)
 
     def commit_bufferblock(self, block, sync):
         """Initiate a background upload of a bufferblock.
@@ -628,6 +655,9 @@ class _BlockManager(object):
 
     @synchronized
     def delete_bufferblock(self, locator):
+        self._delete_bufferblock(locator)
+
+    def _delete_bufferblock(self, locator):
         bb = self._bufferblocks[locator]
         bb.clear()
         del self._bufferblocks[locator]
@@ -658,8 +688,9 @@ class _BlockManager(object):
         are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
 
         """
+        self.repack_small_blocks(force=True, sync=True)
+
         with self.lock:
-            self.repack_small_blocks(force=True)
             items = self._bufferblocks.items()
 
         for k,v in items:
@@ -730,7 +761,7 @@ class ArvadosFile(object):
         """
         self.parent = parent
         self.name = name
-        self._closed = False
+        self._writers = set()
         self._committed = False
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -741,6 +772,14 @@ class ArvadosFile(object):
     def writable(self):
         return self.parent.writable()
 
+    @synchronized
+    def permission_expired(self, as_of_dt=None):
+        """Returns True if any of the segment's locators is expired"""
+        for r in self._segments:
+            if KeepLocator(r.locator).permission_expired(as_of_dt):
+                return True
+        return False
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
@@ -772,7 +811,7 @@ class ArvadosFile(object):
 
             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 
-        self._committed = False
+        self.set_committed(False)
 
     def __eq__(self, other):
         if other is self:
@@ -808,9 +847,22 @@ class ArvadosFile(object):
         return not self.__eq__(other)
 
     @synchronized
-    def set_committed(self):
-        """Set committed flag to False"""
-        self._committed = True
+    def set_segments(self, segs):
+        self._segments = segs
+
+    @synchronized
+    def set_committed(self, value=True):
+        """Set committed flag.
+
+        If value is True, set committed to be True.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        self._committed = value
+        if self._committed is False and self.parent is not None:
+            self.parent.set_committed(False)
 
     @synchronized
     def committed(self):
@@ -818,15 +870,32 @@ class ArvadosFile(object):
         return self._committed
 
     @synchronized
-    def set_closed(self):
-        """Set current block as pending and closed flag to False"""
-        self._closed = True
-        self.parent._my_block_manager().repack_small_blocks()
+    def add_writer(self, writer):
+        """Add an ArvadosFileWriter reference to the list of writers"""
+        if isinstance(writer, ArvadosFileWriter):
+            self._writers.add(writer)
 
     @synchronized
+    def remove_writer(self, writer, flush):
+        """
+        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
+        and do some block maintenance tasks.
+        """
+        self._writers.remove(writer)
+
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
+            # File writer closed, not small enough for repacking
+            self.flush()
+        elif self.closed():
+            # All writers closed and size is adequate for repacking
+            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
+
     def closed(self):
-        """Get whether this is closed or not."""
-        return self._closed
+        """
+        Get whether this is closed or not. When the writers list is empty, the file
+        is supposed to be closed.
+        """
+        return len(self._writers) == 0
 
     @must_be_writable
     @synchronized
@@ -854,7 +923,7 @@ class ArvadosFile(object):
                     new_segs.append(r)
 
             self._segments = new_segs
-            self._committed = False
+            self.set_committed(False)
         elif size > self.size():
             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
 
@@ -941,7 +1010,7 @@ class ArvadosFile(object):
                 n += config.KEEP_BLOCK_SIZE
             return
 
-        self._committed = False
+        self.set_committed(False)
 
         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -1004,7 +1073,7 @@ class ArvadosFile(object):
 
     def _add_segment(self, blocks, pos, size):
         """Internal implementation of add_segment."""
-        self._committed = False
+        self.set_committed(False)
         for lr in locators_and_ranges(blocks, pos, size):
             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
@@ -1020,12 +1089,15 @@ class ArvadosFile(object):
             return 0
 
     @synchronized
-    def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+    def manifest_text(self, stream_name=".", portable_locators=False,
+                      normalize=False, only_committed=False):
         buf = ""
         filestream = []
         for segment in self.segments:
             loc = segment.locator
-            if loc.startswith("bufferblock"):
+            if self.parent._my_block_manager().is_bufferblock(loc):
+                if only_committed:
+                    continue
                 loc = self._bufferblocks[loc].calculate_locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
@@ -1038,7 +1110,7 @@ class ArvadosFile(object):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
-        self._committed = False
+        self.set_committed(False)
         self.flush(sync=True)
         self.parent.remove(self.name)
         self.parent = newparent
@@ -1109,6 +1181,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def __init__(self, arvadosfile, mode, num_retries=None):
         super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
         self.mode = mode
+        self.arvadosfile.add_writer(self)
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -1140,7 +1213,5 @@ class ArvadosFileWriter(ArvadosFileReader):
 
     def close(self, flush=True):
         if not self.closed:
-            if flush:
-                self.flush()
-            self.arvadosfile.set_closed()
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()