yield data
def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self.name)
+ return re.sub(r'\.(bz2|gz)$', '', self.name)
@_FileLikeObjectBase._before_close
def seek(self, pos, whence=os.SEEK_SET):
"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+ def __init__(self, keep,
+ copies=None,
+ put_threads=None,
+ num_retries=None,
+ storage_classes_func=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
- self._prefetch_queue = None
- self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
thread.daemon = True
thread.start()
- def _block_prefetch_worker(self):
- """The background downloader thread."""
- while True:
- try:
- b = self._prefetch_queue.get()
- if b is None:
- return
- self._keep.get(b)
- except Exception:
- _logger.exception("Exception doing block prefetch")
-
- @synchronized
- def start_get_threads(self):
- if self._prefetch_threads is None:
- self._prefetch_queue = queue.Queue()
- self._prefetch_threads = []
- for i in range(0, self.num_get_threads):
- thread = threading.Thread(target=self._block_prefetch_worker)
- self._prefetch_threads.append(thread)
- thread.daemon = True
- thread.start()
-
-
@synchronized
def stop_threads(self):
"""Shut down and wait for background upload and download threads to finish."""
self._put_threads = None
self._put_queue = None
- if self._prefetch_threads is not None:
- for t in self._prefetch_threads:
- self._prefetch_queue.put(None)
- for t in self._prefetch_threads:
- t.join()
- self._prefetch_threads = None
- self._prefetch_queue = None
-
def __enter__(self):
return self
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
- bb = self._bufferblocks[locator]
- bb.clear()
- del self._bufferblocks[locator]
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
owner.flush(sync=True)
self.delete_bufferblock(k)
+ self.stop_threads()
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
-
- This assumes that the underlying KeepClient implements a block cache,
- so repeated requests for the same block will not result in repeated
- downloads (unless the block is evicted from the cache.) This method
- does not block.
-
"""
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
- self.start_get_threads()
- self._prefetch_queue.put(locator)
+ self._keep.block_prefetch(locator)
class ArvadosFile(object):
"""
__slots__ = ('parent', 'name', '_writers', '_committed',
- '_segments', 'lock', '_current_bblock', 'fuse_entry')
+ '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
def __init__(self, parent, name, stream=[], segments=[]):
"""
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
+ self._read_counter = 0
def writable(self):
return self.parent.writable()
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = None
+ if self.parent._my_block_manager()._keep.num_prefetch_threads > 0 and (self._read_counter % 128) == 0:
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads,
+ limit=(1+self.parent._my_block_manager()._keep.num_prefetch_threads))
+ self._read_counter += 1
locs = set()
data = []
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
blockview = memoryview(block)
- data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
locs.add(lr.locator)
else:
break
- for lr in prefetch:
- if lr.locator not in locs:
- self.parent._my_block_manager().block_prefetch(lr.locator)
- locs.add(lr.locator)
+ if prefetch:
+ for lr in prefetch:
+ if lr.locator not in locs:
+ self.parent._my_block_manager().block_prefetch(lr.locator)
+ locs.add(lr.locator)
- return b''.join(data)
+ if len(data) == 1:
+ return data[0]
+ else:
+ return b''.join(data)
@must_be_writable
@synchronized