21639: Wrap around read counter, have slightly less indirection
[arvados.git] / sdk / python / arvados / arvfile.py
index 3281d78e209db3a0e69726d285c59b456ea93035..e0e972b5c178422f84b1f2a8614adab1f6321bc9 100644 (file)
@@ -88,9 +88,6 @@ class _FileLikeObjectBase(object):
 class ArvadosFileReaderBase(_FileLikeObjectBase):
     def __init__(self, name, mode, num_retries=None):
         super(ArvadosFileReaderBase, self).__init__(name, mode)
-        self._binary = 'b' in mode
-        if sys.version_info >= (3, 0) and not self._binary:
-            raise NotImplementedError("text mode {!r} is not implemented".format(mode))
         self._filepos = 0
         self.num_retries = num_retries
         self._readline_cache = (None, None)
@@ -103,7 +100,7 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
             yield data
 
     def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self.name)
+        return re.sub(r'\.(bz2|gz)$', '', self.name)
 
     @_FileLikeObjectBase._before_close
     def seek(self, pos, whence=os.SEEK_SET):
@@ -482,27 +479,26 @@ class _BlockManager(object):
     """
 
     DEFAULT_PUT_THREADS = 2
-    DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep, copies=None, put_threads=None):
+    def __init__(self, keep,
+                 copies=None,
+                 put_threads=None,
+                 num_retries=None,
+                 storage_classes_func=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = collections.OrderedDict()
         self._put_queue = None
         self._put_threads = None
-        self._prefetch_queue = None
-        self._prefetch_threads = None
         self.lock = threading.Lock()
-        self.prefetch_enabled = True
-        if put_threads:
-            self.num_put_threads = put_threads
-        else:
-            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
-        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.prefetch_lookahead = self._keep.num_prefetch_threads
+        self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
         self.copies = copies
+        self.storage_classes = storage_classes_func or (lambda: [])
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
         self.padding_block = None
+        self.num_retries = num_retries
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -557,9 +553,9 @@ class _BlockManager(object):
                     return
 
                 if self.copies is None:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 bufferblock.set_state(_BufferBlock.ERROR, e)
@@ -574,7 +570,7 @@ class _BlockManager(object):
 
                 # If we don't limit the Queue size, the upload queue can quickly
                 # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
+                # generating data more quickly than it can be sent to the Keep
                 # servers.
                 #
                 # With two upload threads and a queue size of 2, this means up to 4
@@ -590,29 +586,6 @@ class _BlockManager(object):
                     thread.daemon = True
                     thread.start()
 
-    def _block_prefetch_worker(self):
-        """The background downloader thread."""
-        while True:
-            try:
-                b = self._prefetch_queue.get()
-                if b is None:
-                    return
-                self._keep.get(b)
-            except Exception:
-                _logger.exception("Exception doing block prefetch")
-
-    @synchronized
-    def start_get_threads(self):
-        if self._prefetch_threads is None:
-            self._prefetch_queue = queue.Queue()
-            self._prefetch_threads = []
-            for i in range(0, self.num_get_threads):
-                thread = threading.Thread(target=self._block_prefetch_worker)
-                self._prefetch_threads.append(thread)
-                thread.daemon = True
-                thread.start()
-
-
     @synchronized
     def stop_threads(self):
         """Shut down and wait for background upload and download threads to finish."""
@@ -625,14 +598,6 @@ class _BlockManager(object):
         self._put_threads = None
         self._put_queue = None
 
-        if self._prefetch_threads is not None:
-            for t in self._prefetch_threads:
-                self._prefetch_queue.put(None)
-            for t in self._prefetch_threads:
-                t.join()
-        self._prefetch_threads = None
-        self._prefetch_queue = None
-
     def __enter__(self):
         return self
 
@@ -728,9 +693,9 @@ class _BlockManager(object):
         if sync:
             try:
                 if self.copies is None:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes())
                 else:
-                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes())
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
@@ -764,9 +729,10 @@ class _BlockManager(object):
         self._delete_bufferblock(locator)
 
     def _delete_bufferblock(self, locator):
-        bb = self._bufferblocks[locator]
-        bb.clear()
-        del self._bufferblocks[locator]
+        if locator in self._bufferblocks:
+            bb = self._bufferblocks[locator]
+            bb.clear()
+            del self._bufferblocks[locator]
 
     def get_block_contents(self, locator, num_retries, cache_only=False):
         """Fetch a block.
@@ -831,28 +797,20 @@ class _BlockManager(object):
                         owner.flush(sync=True)
                     self.delete_bufferblock(k)
 
+        self.stop_threads()
+
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
-
-        This assumes that the underlying KeepClient implements a block cache,
-        so repeated requests for the same block will not result in repeated
-        downloads (unless the block is evicted from the cache.)  This method
-        does not block.
-
         """
 
-        if not self.prefetch_enabled:
-            return
-
-        if self._keep.get_from_cache(locator) is not None:
+        if not self.prefetch_lookahead:
             return
 
         with self.lock:
             if locator in self._bufferblocks:
                 return
 
-        self.start_get_threads()
-        self._prefetch_queue.put(locator)
+        self._keep.block_prefetch(locator)
 
 
 class ArvadosFile(object):
@@ -867,7 +825,7 @@ class ArvadosFile(object):
     """
 
     __slots__ = ('parent', 'name', '_writers', '_committed',
-                 '_segments', 'lock', '_current_bblock', 'fuse_entry')
+                 '_segments', 'lock', '_current_bblock', 'fuse_entry', '_read_counter')
 
     def __init__(self, parent, name, stream=[], segments=[]):
         """
@@ -888,6 +846,7 @@ class ArvadosFile(object):
         for s in segments:
             self._add_segment(stream, s.locator, s.range_size)
         self._current_bblock = None
+        self._read_counter = 0
 
     def writable(self):
         return self.parent.writable()
@@ -1102,7 +1061,25 @@ class ArvadosFile(object):
             if size == 0 or offset >= self.size():
                 return b''
             readsegs = locators_and_ranges(self._segments, offset, size)
-            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+
+            prefetch = None
+            prefetch_lookahead = self.parent._my_block_manager().prefetch_lookahead
+            if prefetch_lookahead:
+                # Doing prefetch on every read() call is surprisingly expensive
+                # when we're trying to deliver data at 600+ MiBps and want
+                # the read() fast path to be as lightweight as possible.
+                #
+                # Only prefetching every 128 read operations
+                # dramatically reduces the overhead while still
+                # getting the benefit of prefetching (e.g. when
+                # reading 128 KiB at a time, it checks for prefetch
+                # every 16 MiB).
+                self._read_counter = (self._read_counter+1) % 128
+                if self._read_counter == 1:
+                    prefetch = locators_and_ranges(self._segments,
+                                                   offset + size,
+                                                   config.KEEP_BLOCK_SIZE * prefetch_lookahead,
+                                                   limit=(1+prefetch_lookahead))
 
         locs = set()
         data = []
@@ -1110,17 +1087,21 @@ class ArvadosFile(object):
             block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
             if block:
                 blockview = memoryview(block)
-                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
+                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size])
                 locs.add(lr.locator)
             else:
                 break
 
-        for lr in prefetch:
-            if lr.locator not in locs:
-                self.parent._my_block_manager().block_prefetch(lr.locator)
-                locs.add(lr.locator)
+        if prefetch:
+            for lr in prefetch:
+                if lr.locator not in locs:
+                    self.parent._my_block_manager().block_prefetch(lr.locator)
+                    locs.add(lr.locator)
 
-        return b''.join(data)
+        if len(data) == 1:
+            return data[0]
+        else:
+            return b''.join(data)
 
     @must_be_writable
     @synchronized
@@ -1278,6 +1259,11 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     def stream_name(self):
         return self.arvadosfile.parent.stream_name()
 
+    def readinto(self, b):
+        data = self.read(len(b))
+        b[:len(data)] = data
+        return len(data)
+
     @_FileLikeObjectBase._before_close
     @retry_method
     def read(self, size=None, num_retries=None):
@@ -1356,3 +1342,33 @@ class ArvadosFileWriter(ArvadosFileReader):
         if not self.closed:
             self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+    """An interface to an Arvados file that's compatible with io wrappers.
+
+    """
+    def __init__(self, f):
+        self.f = f
+        self.closed = False
+    def close(self):
+        self.closed = True
+        return self.f.close()
+    def flush(self):
+        return self.f.flush()
+    def read(self, *args, **kwargs):
+        return self.f.read(*args, **kwargs)
+    def readable(self):
+        return self.f.readable()
+    def readinto(self, *args, **kwargs):
+        return self.f.readinto(*args, **kwargs)
+    def seek(self, *args, **kwargs):
+        return self.f.seek(*args, **kwargs)
+    def seekable(self):
+        return self.f.seekable()
+    def tell(self):
+        return self.f.tell()
+    def writable(self):
+        return self.f.writable()
+    def write(self, *args, **kwargs):
+        return self.f.write(*args, **kwargs)