projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
18941: Separate get() behavior for prefetch
[arvados.git]
/
sdk
/
python
/
arvados
/
arvfile.py
diff --git
a/sdk/python/arvados/arvfile.py
b/sdk/python/arvados/arvfile.py
index e915ff2ac0a37c86c635f9ce68f52d227fed1725..a13575b715922f306c14ce4bb6a82fbe766fae8f 100644
(file)
--- a/
sdk/python/arvados/arvfile.py
+++ b/
sdk/python/arvados/arvfile.py
@@
-481,7
+481,7
@@
class _BlockManager(object):
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None
, get_threads=None
):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
@@
-492,7
+492,7
@@
class _BlockManager(object):
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_get_threads =
get_threads or
_BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
@@
-593,7
+593,7
@@
class _BlockManager(object):
b = self._prefetch_queue.get()
if b is None:
return
b = self._prefetch_queue.get()
if b is None:
return
- self._keep.get(b)
+ self._keep.get(b
, cache_slot_get=False
)
except Exception:
_logger.exception("Exception doing block prefetch")
except Exception:
_logger.exception("Exception doing block prefetch")
@@
-760,9
+760,10
@@
class _BlockManager(object):
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
- bb = self._bufferblocks[locator]
- bb.clear()
- del self._bufferblocks[locator]
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
@@
-840,14
+841,12
@@
class _BlockManager(object):
if not self.prefetch_enabled:
return
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
self.start_get_threads()
with self.lock:
if locator in self._bufferblocks:
return
self.start_get_threads()
+ # _logger.debug("pushing %s to prefetch", locator)
self._prefetch_queue.put(locator)
self._prefetch_queue.put(locator)
@@
-1098,7
+1097,7
@@
class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE
* self.parent._my_block_manager().num_get_threads
, limit=32)
locs = set()
data = []
locs = set()
data = []