11002: Merge branch 'master' into 11002-arvput-crash-fix
[arvados.git] / sdk / python / arvados / arvfile.py
index eadb3a9bd1638cb9b384c3592b4790fe1f97e3bd..9db19b05f6bc356c2b673d4983551b2dceef7122 100644 (file)
@@ -38,6 +38,12 @@ def split(path):
         stream_name, file_name = '.', path
     return stream_name, file_name
 
+
+class UnownedBlockError(Exception):
+    """Raised when there's an writable block without an owner on the BlockManager."""
+    pass
+
+
 class _FileLikeObjectBase(object):
     def __init__(self, name, mode):
         self.name = name
@@ -404,7 +410,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None):
+    def __init__(self, keep, copies=None, put_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -414,7 +420,10 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+        if put_threads:
+            self.num_put_threads = put_threads
+        else:
+            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self._pending_write_size = 0
@@ -516,7 +525,7 @@ class _BlockManager(object):
                     return
                 self._keep.get(b)
             except Exception:
-                pass
+                _logger.exception("Exception doing block prefetch")
 
     @synchronized
     def start_get_threads(self):
@@ -568,7 +577,11 @@ class _BlockManager(object):
             # A WRITABLE block always has an owner.
             # A WRITABLE block with its owner.closed() implies that it's
             # size is <= KEEP_BLOCK_SIZE/2.
-            small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+            try:
+                small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+            except AttributeError:
+                # Writable blocks without owner shouldn't exist.
+                raise UnownedBlockError()
 
             if len(small_blocks) <= 1:
                 # Not enough small blocks for repacking
@@ -798,7 +811,7 @@ class ArvadosFile(object):
 
             self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
 
-        self._committed = False
+        self.set_committed(False)
 
     def __eq__(self, other):
         if other is self:
@@ -838,9 +851,18 @@ class ArvadosFile(object):
         self._segments = segs
 
     @synchronized
-    def set_committed(self):
-        """Set committed flag to True"""
-        self._committed = True
+    def set_committed(self, value=True):
+        """Set committed flag.
+
+        If value is True, set committed to be True.
+
+        If value is False, set committed to be False for this and all parents.
+        """
+        if value == self._committed:
+            return
+        self._committed = value
+        if self._committed is False and self.parent is not None:
+            self.parent.set_committed(False)
 
     @synchronized
     def committed(self):
@@ -901,7 +923,7 @@ class ArvadosFile(object):
                     new_segs.append(r)
 
             self._segments = new_segs
-            self._committed = False
+            self.set_committed(False)
         elif size > self.size():
             raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
 
@@ -988,7 +1010,7 @@ class ArvadosFile(object):
                 n += config.KEEP_BLOCK_SIZE
             return
 
-        self._committed = False
+        self.set_committed(False)
 
         if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
             self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
@@ -1051,7 +1073,7 @@ class ArvadosFile(object):
 
     def _add_segment(self, blocks, pos, size):
         """Internal implementation of add_segment."""
-        self._committed = False
+        self.set_committed(False)
         for lr in locators_and_ranges(blocks, pos, size):
             last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
@@ -1067,12 +1089,15 @@ class ArvadosFile(object):
             return 0
 
     @synchronized
-    def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+    def manifest_text(self, stream_name=".", portable_locators=False,
+                      normalize=False, only_committed=False):
         buf = ""
         filestream = []
         for segment in self.segments:
             loc = segment.locator
-            if loc.startswith("bufferblock"):
+            if self.parent._my_block_manager().is_bufferblock(loc):
+                if only_committed:
+                    continue
                 loc = self._bufferblocks[loc].calculate_locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()
@@ -1085,7 +1110,7 @@ class ArvadosFile(object):
     @must_be_writable
     @synchronized
     def _reparent(self, newparent, newname):
-        self._committed = False
+        self.set_committed(False)
         self.flush(sync=True)
         self.parent.remove(self.name)
         self.parent = newparent