X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ca56c80b05906c110d63b724505684c450c098d5..4090574822afd6a7b48ccd277ba84c3cc6244e71:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index a22df2c031..f3816c0d3e 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -275,6 +275,7 @@ class CollectionDirectoryBase(Directory): self.apiconfig = apiconfig self.collection = collection self.collection_root = collection_root + self.collection_record_file = None def new_entry(self, name, item, mtime): name = self.sanitize_filename(name) @@ -437,7 +438,6 @@ class CollectionDirectory(CollectionDirectoryBase): super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self) self.api = api self.num_retries = num_retries - self.collection_record_file = None self._poll = True try: self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2) @@ -525,15 +525,23 @@ class CollectionDirectory(CollectionDirectoryBase): self.collection.update() new_collection_record = self.collection.api_response() else: + # If there's too many prefetch threads and you + # max out the CPU, delivering data to the FUSE + # layer actually ends up being slower. + # Experimentally, capping 7 threads seems to + # be a sweet spot. + get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7) # Create a new collection object if uuid_pattern.match(self.collection_locator): coll_reader = arvados.collection.Collection( self.collection_locator, self.api, self.api.keep, - num_retries=self.num_retries) + num_retries=self.num_retries, + get_threads=get_threads) else: coll_reader = arvados.collection.CollectionReader( self.collection_locator, self.api, self.api.keep, - num_retries=self.num_retries) + num_retries=self.num_retries, + get_threads=get_threads) new_collection_record = coll_reader.api_response() or {} # If the Collection only exists in Keep, there will be no API # response. Fill in the fields we need. @@ -647,32 +655,32 @@ class TmpCollectionDirectory(CollectionDirectoryBase): # save to the backend super(TmpCollectionDirectory, self).__init__( parent_inode, inodes, api_client.config, True, collection, self) - self.collection_record_file = None self.populate(self.mtime()) def on_event(self, *args, **kwargs): super(TmpCollectionDirectory, self).on_event(*args, **kwargs) - if self.collection_record_file: + if self.collection_record_file is None: + return - # See discussion in CollectionDirectoryBase.on_event - lockcount = 0 - try: - while True: - self.collection.lock.release() - lockcount += 1 - except RuntimeError: - pass + # See discussion in CollectionDirectoryBase.on_event + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass - try: - with llfuse.lock: - with self.collection.lock: - self.collection_record_file.invalidate() - self.inodes.invalidate_inode(self.collection_record_file) - _logger.debug("%s invalidated collection record", self) - finally: - while lockcount > 0: - self.collection.lock.acquire() - lockcount -= 1 + try: + with llfuse.lock: + with self.collection.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def collection_record(self): with llfuse.lock_released: