Merge branch '9397-prepopulate-output-directory' refs #9397
[arvados.git] / sdk / python / arvados / arvfile.py
index 517d617d8c4f8403953b5d0b105808e0bd18ac0d..edeb910570ed80af8d1b45589cc252f7b58e718f 100644 (file)
@@ -404,7 +404,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None):
+    def __init__(self, keep, copies=None, put_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -414,7 +414,10 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+        if put_threads:
+            self.num_put_threads = put_threads
+        else:
+            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self._pending_write_size = 0
@@ -759,6 +762,14 @@ class ArvadosFile(object):
     def writable(self):
         return self.parent.writable()
 
+    @synchronized
+    def permission_expired(self, as_of_dt=None):
+        """Returns True if any of the segment's locators is expired"""
+        for r in self._segments:
+            if KeepLocator(r.locator).permission_expired(as_of_dt):
+                return True
+        return False
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
@@ -1059,12 +1070,15 @@ class ArvadosFile(object):
             return 0
 
     @synchronized
-    def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+    def manifest_text(self, stream_name=".", portable_locators=False,
+                      normalize=False, only_committed=False):
         buf = ""
         filestream = []
         for segment in self.segments:
             loc = segment.locator
-            if loc.startswith("bufferblock"):
+            if self.parent._my_block_manager().is_bufferblock(loc):
+                if only_committed:
+                    continue
                 loc = self._bufferblocks[loc].calculate_locator()
             if portable_locators:
                 loc = KeepLocator(loc).stripped()