20933: Use [0-9] instead of \d in regex
[arvados.git] / sdk / python / arvados / arvfile.py
index 7c6b732d36fcefc5a88896382ed99b61b83f9d4d..4b95835aac0f25a57fd999a2c5f9cff0e54014e1 100644 (file)
@@ -100,7 +100,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
             yield data
 
     def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self.name)
+        return re.sub(r'\.(bz2|gz)$', '', self.name)
 
     @_FileLikeObjectBase._before_close
     def seek(self, pos, whence=os.SEEK_SET):
@@ -479,25 +479,22 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes=[]):
+    def __init__(self, keep,
+                 copies=None,
+                 put_threads=None,
+                 num_retries=None,
+                 storage_classes_func=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
-        self._prefetch_queue = None
-        self._prefetch_threads = None
         self.lock = threading.Lock()
         self.prefetch_enabled = True
-        if put_threads:
-            self.num_put_threads = put_threads
-        else:
-            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
         self.copies = copies
-        self.storage_classes = storage_classes
+        self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
@@ -556,9 +553,9 @@ class _BlockManager(object):
                     return
 
                 if self.copies is None:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
@@ -573,7 +570,7 @@ class _BlockManager(object):
 
                 # If we don't limit the Queue size, the upload queue can quickly
                 # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
+                # generating data more quickly than it can be sent to the Keep
                 # servers.
                 #
                 # With two upload threads and a queue size of 2, this means up to 4
@@ -589,29 +586,6 @@ class _BlockManager(object):
                     thread.daemon = True
                     thread.start()
 
-    def _block_prefetch_worker(self):
-        """The background downloader thread."""
-        while True:
-            try:
-                b = self._prefetch_queue.get()
-                if b is None:
-                    return
-                self._keep.get(b)
-            except Exception:
-                _logger.exception("Exception doing block prefetch")
-
-    @synchronized
-    def start_get_threads(self):
-        if self._prefetch_threads is None:
-            self._prefetch_queue = queue.Queue()
-            self._prefetch_threads = []
-            for i in range(0, self.num_get_threads):
-                thread = threading.Thread(target=self._block_prefetch_worker)
-                self._prefetch_threads.append(thread)
-                thread.daemon = True
-                thread.start()
-
-
     @synchronized
     def stop_threads(self):
         """Shut down and wait for background upload and download threads to finish."""
@@ -624,14 +598,6 @@ class _BlockManager(object):
         self._put_threads = None
         self._put_queue = None
 
-        if self._prefetch_threads is not None:
-            for t in self._prefetch_threads:
-                self._prefetch_queue.put(None)
-            for t in self._prefetch_threads:
-                t.join()
-        self._prefetch_threads = None
-        self._prefetch_queue = None
-
     def __enter__(self):
         return self
 
@@ -727,9 +693,9 @@ class _BlockManager(object):
         if sync:
             try:
                 if self.copies is None:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
@@ -763,9 +729,10 @@ class _BlockManager(object):
         self._delete_bufferblock(locator)
 
     def _delete_bufferblock(self, locator):
-        bb = self._bufferblocks[locator]
-        bb.clear()
-        del self._bufferblocks[locator]
+        if locator in self._bufferblocks:
+            bb = self._bufferblocks[locator]
+            bb.clear()
+            del self._bufferblocks[locator]
 
     def get_block_contents(self, locator, num_retries, cache_only=False):
         """Fetch a block.
@@ -830,28 +797,20 @@ class _BlockManager(object):
                         owner.flush(sync=True)
                     self.delete_bufferblock(k)
 
+        self.stop_threads()
+
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
-
-        This assumes that the underlying KeepClient implements a block cache,
-        so repeated requests for the same block will not result in repeated
-        downloads (unless the block is evicted from the cache.)  This method
-        does not block.
-
         """
 
         if not self.prefetch_enabled:
             return
 
-        if self._keep.get_from_cache(locator) is not None:
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
 
-        self.start_get_threads()
-        self._prefetch_queue.put(locator)
+        self._keep.block_prefetch(locator)
 
 
 class ArvadosFile(object):
@@ -1101,7 +1060,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
 
         locs = set()
         data = []