Merge branch '9397-prepopulate-output-directory' refs #9397
[arvados.git] / sdk / python / arvados / arvfile.py
index 1ca7ad82ed845844a2e18cc7bf4b7a9d24c3d652..edeb910570ed80af8d1b45589cc252f7b58e718f 100644 (file)
@@ -404,7 +404,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None):
+    def __init__(self, keep, copies=None, put_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -414,9 +414,14 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+        if put_threads:
+            self.num_put_threads = put_threads
+        else:
+            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
+        self._pending_write_size = 0
+        self.threads_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -482,28 +487,28 @@ class _BlockManager(object):
                 if self._put_queue is not None:
                     self._put_queue.task_done()
 
-    @synchronized
     def start_put_threads(self):
-        if self._put_threads is None:
-            # Start uploader threads.
-
-            # If we don't limit the Queue size, the upload queue can quickly
-            # grow to take up gigabytes of RAM if the writing process is
-            # generating data more quickly than it can be send to the Keep
-            # servers.
-            #
-            # With two upload threads and a queue size of 2, this means up to 4
-            # blocks pending.  If they are full 64 MiB blocks, that means up to
-            # 256 MiB of internal buffering, which is the same size as the
-            # default download block cache in KeepClient.
-            self._put_queue = Queue.Queue(maxsize=2)
-
-            self._put_threads = []
-            for i in xrange(0, self.num_put_threads):
-                thread = threading.Thread(target=self._commit_bufferblock_worker)
-                self._put_threads.append(thread)
-                thread.daemon = True
-                thread.start()
+        with self.threads_lock:
+            if self._put_threads is None:
+                # Start uploader threads.
+
+                # If we don't limit the Queue size, the upload queue can quickly
+                # grow to take up gigabytes of RAM if the writing process is
+                # generating data more quickly than it can be send to the Keep
+                # servers.
+                #
+                # With two upload threads and a queue size of 2, this means up to 4
+                # blocks pending.  If they are full 64 MiB blocks, that means up to
+                # 256 MiB of internal buffering, which is the same size as the
+                # default download block cache in KeepClient.
+                self._put_queue = Queue.Queue(maxsize=2)
+
+                self._put_threads = []
+                for i in xrange(0, self.num_put_threads):
+                    thread = threading.Thread(target=self._commit_bufferblock_worker)
+                    self._put_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
 
     def _block_prefetch_worker(self):
         """The background downloader thread."""
@@ -514,7 +519,7 @@ class _BlockManager(object):
                     return
                 self._keep.get(b)
             except Exception:
-                pass
+                _logger.exception("Exception doing block prefetch")
 
     @synchronized
     def start_get_threads(self):
@@ -555,24 +560,34 @@ class _BlockManager(object):
         self.stop_threads()
 
     @synchronized
-    def repack_small_blocks(self, force=False, sync=False):
+    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
-        # Search blocks ready for getting packed together before being committed to Keep.
-        # A WRITABLE block always has an owner.
-        # A WRITABLE block with its owner.closed() implies that it's
-        # size is <= KEEP_BLOCK_SIZE/2.
-        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-        if len(small_blocks) <= 1:
-            # Not enough small blocks for repacking
-            return
+        self._pending_write_size += closed_file_size
 
         # Check if there are enough small blocks for filling up one in full
-        pending_write_size = sum([b.size() for b in small_blocks])
-        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+            # Search blocks ready for getting packed together before being committed to Keep.
+            # A WRITABLE block always has an owner.
+            # A WRITABLE block with its owner.closed() implies that it's
+            # size is <= KEEP_BLOCK_SIZE/2.
+            small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+            if len(small_blocks) <= 1:
+                # Not enough small blocks for repacking
+                return
+
+            # Update the pending write size count with its true value, just in case
+            # some small file was opened, written and closed several times.
+            self._pending_write_size = sum([b.size() for b in small_blocks])
+            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+                return
+
             new_bb = self._alloc_bufferblock()
             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
                 arvfile = bb.owner
+                self._pending_write_size -= bb.size()
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
                 arvfile.set_segments([Range(new_bb.blockid,
                                             0,
@@ -747,6 +762,14 @@ class ArvadosFile(object):
     def writable(self):
         return self.parent.writable()
 
+    @synchronized
+    def permission_expired(self, as_of_dt=None):
+        """Returns True if any of the segment's locators is expired"""
+        for r in self._segments:
+            if KeepLocator(r.locator).permission_expired(as_of_dt):
+                return True
+        return False
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
@@ -834,19 +857,19 @@ class ArvadosFile(object):
             self._writers.add(writer)
 
     @synchronized
-    def remove_writer(self, writer):
+    def remove_writer(self, writer, flush):
         """
         Called from ArvadosFileWriter.close(). Remove a writer reference from the list
         and do some block maintenance tasks.
         """
         self._writers.remove(writer)
 
-        if self.size() > config.KEEP_BLOCK_SIZE / 2:
+        if flush or self.size() > config.KEEP_BLOCK_SIZE / 2:
             # File writer closed, not small enough for repacking
             self.flush()
         elif self.closed():
             # All writers closed and size is adequate for repacking
-            self.parent._my_block_manager().repack_small_blocks()
+            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 
     def closed(self):
         """
@@ -1047,12 +1070,15 @@ class ArvadosFile(object):
             return 0
 
     @synchronized
-    def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+    def manifest_text(self, stream_name=".", portable_locators=False,
+                      normalize=False, only_committed=False):
         buf = ""
         filestream = []
         for segment in self.segments:
             loc = segment.locator
-            if loc.startswith("bufferblock"):
+            if self.parent._my_block_manager().is_bufferblock(loc):
+                if only_committed:
+                    continue
                 loc = self._bufferblocks[loc].calculate_locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
@@ -1166,7 +1192,7 @@ class ArvadosFileWriter(ArvadosFileReader):
     def flush(self):
         self.arvadosfile.flush()
 
-    def close(self):
+    def close(self, flush=True):
         if not self.closed:
-            self.arvadosfile.remove_writer(self)
+            self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()