yield data
def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self.name)
+ return re.sub(r'\.(bz2|gz)$', '', self.name)
@_FileLikeObjectBase._before_close
def seek(self, pos, whence=os.SEEK_SET):
"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes=[]):
+ def __init__(self, keep,
+ copies=None,
+ put_threads=None,
+ num_retries=None,
+ storage_classes_func=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
- self._prefetch_queue = None
- self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.copies = copies
- self.storage_classes = storage_classes
+ self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
+ # generating data more quickly than it can be sent to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
thread.daemon = True
thread.start()
- def _block_prefetch_worker(self):
- """The background downloader thread."""
- while True:
- try:
- b = self._prefetch_queue.get()
- if b is None:
- return
- self._keep.get(b)
- except Exception:
- _logger.exception("Exception doing block prefetch")
-
- @synchronized
- def start_get_threads(self):
- if self._prefetch_threads is None:
- self._prefetch_queue = queue.Queue()
- self._prefetch_threads = []
- for i in range(0, self.num_get_threads):
- thread = threading.Thread(target=self._block_prefetch_worker)
- self._prefetch_threads.append(thread)
- thread.daemon = True
- thread.start()
-
-
@synchronized
def stop_threads(self):
"""Shut down and wait for background upload and download threads to finish."""
self._put_threads = None
self._put_queue = None
- if self._prefetch_threads is not None:
- for t in self._prefetch_threads:
- self._prefetch_queue.put(None)
- for t in self._prefetch_threads:
- t.join()
- self._prefetch_threads = None
- self._prefetch_queue = None
-
def __enter__(self):
return self
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
- bb = self._bufferblocks[locator]
- bb.clear()
- del self._bufferblocks[locator]
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
owner.flush(sync=True)
self.delete_bufferblock(k)
+ self.stop_threads()
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
-
- This assumes that the underlying KeepClient implements a block cache,
- so repeated requests for the same block will not result in repeated
- downloads (unless the block is evicted from the cache.) This method
- does not block.
-
"""
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
- self.start_get_threads()
- self._prefetch_queue.put(locator)
+ self._keep.block_prefetch(locator)
class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager()._keep.num_prefetch_threads, limit=32)
locs = set()
data = []