20933: Use [0-9] instead of \d in regex
[arvados.git] / sdk / python / arvados / arvfile.py
index 2ce0e46b30bd67ad948f832183ab091865c2ea53..4b95835aac0f25a57fd999a2c5f9cff0e54014e1 100644 (file)
@@ -100,7 +100,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
             yield data
 
     def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self.name)
+        return re.sub(r'\.(bz2|gz)$', '', self.name)
 
     @_FileLikeObjectBase._before_close
     def seek(self, pos, whence=os.SEEK_SET):
@@ -479,20 +479,20 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
+    def __init__(self, keep,
+                 copies=None,
+                 put_threads=None,
+                 num_retries=None,
+                 storage_classes_func=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
-        self._prefetch_queue = None
-        self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -586,29 +586,6 @@ class _BlockManager(object):
                     thread.daemon = True
                     thread.start()
 
-    def _block_prefetch_worker(self):
-        """The background downloader thread."""
-        while True:
-            try:
-                b = self._prefetch_queue.get()
-                if b is None:
-                    return
-                self._keep.get(b, prefetch=True)
-            except Exception:
-                _logger.exception("Exception doing block prefetch")
-
-    @synchronized
-    def start_get_threads(self):
-        if self._prefetch_threads is None:
-            self._prefetch_queue = queue.Queue()
-            self._prefetch_threads = []
-            for i in range(0, self.num_get_threads):
-                thread = threading.Thread(target=self._block_prefetch_worker)
-                self._prefetch_threads.append(thread)
-                thread.daemon = True
-                thread.start()
-
-
     @synchronized
     def stop_threads(self):
         """Shut down and wait for background upload and download threads to finish."""
@@ -621,14 +598,6 @@ class _BlockManager(object):
         self._put_threads = None
         self._put_queue = None
 
-        if self._prefetch_threads is not None:
-            for t in self._prefetch_threads:
-                self._prefetch_queue.put(None)
-            for t in self._prefetch_threads:
-                t.join()
-        self._prefetch_threads = None
-        self._prefetch_queue = None
-
     def __enter__(self):
         return self
 
@@ -828,14 +797,10 @@ class _BlockManager(object):
                         owner.flush(sync=True)
                     self.delete_bufferblock(k)
 
+        self.stop_threads()
+
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
-
-        This assumes that the underlying KeepClient implements a block cache,
-        so repeated requests for the same block will not result in repeated
-        downloads (unless the block is evicted from the cache.)  This method
-        does not block.
-
         """
 
         if not self.prefetch_enabled:
@@ -845,8 +810,7 @@ class _BlockManager(object):
             if locator in self._bufferblocks:
                 return
 
-        self.start_get_threads()
-        self._prefetch_queue.put(locator)
+        self._keep.block_prefetch(locator)
 
 
 class ArvadosFile(object):
@@ -1096,7 +1060,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
 
         locs = set()
         data = []