class ArvadosFileReaderBase(_FileLikeObjectBase):
def __init__(self, name, mode, num_retries=None):
super(ArvadosFileReaderBase, self).__init__(name, mode)
- self._binary = 'b' in mode
- if sys.version_info >= (3, 0) and not self._binary:
- raise NotImplementedError("text mode {!r} is not implemented".format(mode))
self._filepos = 0
self.num_retries = num_retries
self._readline_cache = (None, None)
yield data
def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self.name)
+ return re.sub(r'\.(bz2|gz)$', '', self.name)
@_FileLikeObjectBase._before_close
def seek(self, pos, whence=os.SEEK_SET):
"""
DEFAULT_PUT_THREADS = 2
- DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None):
+ def __init__(self, keep,
+ copies=None,
+ put_threads=None,
+ num_retries=None,
+ storage_classes_func=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self._put_queue = None
self._put_threads = None
- self._prefetch_queue = None
- self._prefetch_threads = None
self.lock = threading.Lock()
- self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.prefetch_lookahead = self._keep.num_prefetch_threads
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.copies = copies
+ self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
+ self.num_retries = num_retries
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
+ # generating data more quickly than it can be sent to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
thread.daemon = True
thread.start()
- def _block_prefetch_worker(self):
- """The background downloader thread."""
- while True:
- try:
- b = self._prefetch_queue.get()
- if b is None:
- return
- self._keep.get(b)
- except Exception:
- _logger.exception("Exception doing block prefetch")
-
- @synchronized
- def start_get_threads(self):
- if self._prefetch_threads is None:
- self._prefetch_queue = queue.Queue()
- self._prefetch_threads = []
- for i in range(0, self.num_get_threads):
- thread = threading.Thread(target=self._block_prefetch_worker)
- self._prefetch_threads.append(thread)
- thread.daemon = True
- thread.start()
-
-
@synchronized
def stop_threads(self):
"""Shut down and wait for background upload and download threads to finish."""
self._put_threads = None
self._put_queue = None
- if self._prefetch_threads is not None:
- for t in self._prefetch_threads:
- self._prefetch_queue.put(None)
- for t in self._prefetch_threads:
- t.join()
- self._prefetch_threads = None
- self._prefetch_queue = None
-
def __enter__(self):
return self
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
- bb = self._bufferblocks[locator]
- bb.clear()
- del self._bufferblocks[locator]
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
owner.flush(sync=True)
self.delete_bufferblock(k)
+ self.stop_threads()
+
def block_prefetch(self, locator):
"""Initiate a background download of a block.
-
- This assumes that the underlying KeepClient implements a block cache,
- so repeated requests for the same block will not result in repeated
- downloads (unless the block is evicted from the cache.) This method
- does not block.
-
"""
- if not self.prefetch_enabled:
- return
-
- if self._keep.get_from_cache(locator) is not None:
+ if not self.prefetch_lookahead:
return
with self.lock:
if locator in self._bufferblocks:
return
- self.start_get_threads()
- self._prefetch_queue.put(locator)
+ self._keep.block_prefetch(locator)
class ArvadosFile(object):
"""
__slots__ = ('parent', 'name', '_writers', '_committed',
- '_segments', 'lock', '_current_bblock', 'fuse_entry')
+ '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
def __init__(self, parent, name, stream=[], segments=[]):
"""
for s in segments:
self._add_segment(stream, s.locator, s.range_size)
self._current_bblock = None
+ self._read_counter = 0
def writable(self):
return self.parent.writable()
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+
+ prefetch = None
+ prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
+ if prefetch_lookahead:
+ # Doing prefetch on every read() call is surprisingly expensive
+ # when we're trying to deliver data at 600+ MiBps and want
+ # the read() fast path to be as lightweight as possible.
+ #
+ # Only prefetching every 128 read operations
+ # dramatically reduces the overhead while still
+ # getting the benefit of prefetching (e.g. when
+ # reading 128 KiB at a time, it checks for prefetch
+ # every 16 MiB).
+ self._read_counter = (self._read_counter+1) % 128
+ if self._read_counter == 1:
+ prefetch = locators_and_ranges(self._segments,
+ offset + size,
+ config.KEEP_BLOCK_SIZE * prefetch_lookahead,
+ limit=(1+prefetch_lookahead))
locs = set()
data = []
block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
if block:
blockview = memoryview(block)
- data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+ data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
locs.add(lr.locator)
else:
break
- for lr in prefetch:
- if lr.locator not in locs:
- self.parent._my_block_manager().block_prefetch(lr.locator)
- locs.add(lr.locator)
+ if prefetch:
+ for lr in prefetch:
+ if lr.locator not in locs:
+ self.parent._my_block_manager().block_prefetch(lr.locator)
+ locs.add(lr.locator)
- return b''.join(data)
+ if len(data) == 1:
+ return data[0]
+ else:
+ return b''.join(data)
@must_be_writable
@synchronized
def stream_name(self):
return self.arvadosfile.parent.stream_name()
+ def readinto(self, b):
+ data = self.read(len(b))
+ b[:len(data)] = data
+ return len(data)
+
@_FileLikeObjectBase._before_close
@retry_method
def read(self, size=None, num_retries=None):
if not self.closed:
self.arvadosfile.remove_writer(self, flush)
super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+ """An interface to an Arvados file that's compatible with io wrappers.
+
+ """
+ def __init__(self, f):
+ self.f = f
+ self.closed = False
+ def close(self):
+ self.closed = True
+ return self.f.close()
+ def flush(self):
+ return self.f.flush()
+ def read(self, *args, **kwargs):
+ return self.f.read(*args, **kwargs)
+ def readable(self):
+ return self.f.readable()
+ def readinto(self, *args, **kwargs):
+ return self.f.readinto(*args, **kwargs)
+ def seek(self, *args, **kwargs):
+ return self.f.seek(*args, **kwargs)
+ def seekable(self):
+ return self.f.seekable()
+ def tell(self):
+ return self.f.tell()
+ def writable(self):
+ return self.f.writable()
+ def write(self, *args, **kwargs):
+ return self.f.write(*args, **kwargs)