19215: store keep blocks in /var/lib/arvados/keep instead of /tmp
[arvados.git] / sdk / python / arvados / arvfile.py
index b21ebd331769109ca0cca3b23e2c45f17ed2c9ed..2ce0e46b30bd67ad948f832183ab091865c2ea53 100644 (file)
@@ -479,9 +479,9 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 4
+    DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+    def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
@@ -492,7 +492,7 @@ class _BlockManager(object):
         self.lock = threading.Lock()
         self.prefetch_enabled = True
         self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
         self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
@@ -593,10 +593,7 @@ class _BlockManager(object):
                 b = self._prefetch_queue.get()
                 if b is None:
                     return
-                if self._keep.has_cache_slot(b):
-                    continue
-                _logger.debug("prefetching %s", b)
-                self._keep.get(b)
+                self._keep.get(b, prefetch=True)
             except Exception:
                 _logger.exception("Exception doing block prefetch")
 
@@ -844,15 +841,11 @@ class _BlockManager(object):
         if not self.prefetch_enabled:
             return
 
-        if self._keep.has_cache_slot(locator):
-            return
-
         with self.lock:
             if locator in self._bufferblocks:
                 return
 
         self.start_get_threads()
-        # _logger.debug("pushing %s to prefetch", locator)
         self._prefetch_queue.put(locator)
 
 
@@ -1103,7 +1096,7 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE*4, limit=32)
+            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
 
         locs = set()
         data = []
@@ -1121,10 +1114,7 @@ class ArvadosFile(object):
                 self.parent._my_block_manager().block_prefetch(lr.locator)
                 locs.add(lr.locator)
 
-        if len(data) == 1:
-            return data[0]
-        else:
-            return b''.join(data)
+        return b''.join(data)
 
     @must_be_writable
     @synchronized