self.apiconfig = apiconfig
self.collection = collection
self.collection_root = collection_root
+ self.collection_record_file = None
def new_entry(self, name, item, mtime):
name = self.sanitize_filename(name)
super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
self.api = api
self.num_retries = num_retries
- self.collection_record_file = None
self._poll = True
try:
self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
self.collection.update()
new_collection_record = self.collection.api_response()
else:
+ # If there's too many prefetch threads and you
+ # max out the CPU, delivering data to the FUSE
+ # layer actually ends up being slower.
+ # Experimentally, capping 7 threads seems to
+ # be a sweet spot.
+ get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
# Create a new collection object
if uuid_pattern.match(self.collection_locator):
coll_reader = arvados.collection.Collection(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
else:
coll_reader = arvados.collection.CollectionReader(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
new_collection_record = coll_reader.api_response() or {}
# If the Collection only exists in Keep, there will be no API
# response. Fill in the fields we need.
# save to the backend
super(TmpCollectionDirectory, self).__init__(
parent_inode, inodes, api_client.config, True, collection, self)
- self.collection_record_file = None
self.populate(self.mtime())
def on_event(self, *args, **kwargs):
super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
- if self.collection_record_file:
+ if self.collection_record_file is None:
+ return
- # See discussion in CollectionDirectoryBase.on_event
- lockcount = 0
- try:
- while True:
- self.collection.lock.release()
- lockcount += 1
- except RuntimeError:
- pass
+ # See discussion in CollectionDirectoryBase.on_event
+ lockcount = 0
+ try:
+ while True:
+ self.collection.lock.release()
+ lockcount += 1
+ except RuntimeError:
+ pass
- try:
- with llfuse.lock:
- with self.collection.lock:
- self.collection_record_file.invalidate()
- self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record", self)
- finally:
- while lockcount > 0:
- self.collection.lock.acquire()
- lockcount -= 1
+ try:
+ with llfuse.lock:
+ with self.collection.lock:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
+ _logger.debug("%s invalidated collection record", self)
+ finally:
+ while lockcount > 0:
+ self.collection.lock.acquire()
+ lockcount -= 1
def collection_record(self):
with llfuse.lock_released: