"""
- def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection):
+ def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
self.apiconfig = apiconfig
self.collection = collection
+ self.collection_root = collection_root
+ self.collection_record_file = None
def new_entry(self, name, item, mtime):
name = self.sanitize_filename(name)
item.fuse_entry.dead = False
self._entries[name] = item.fuse_entry
elif isinstance(item, arvados.collection.RichCollectionBase):
- self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item))
+ self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
self._entries[name].populate(mtime)
else:
self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
item.fuse_entry = self._entries[name]
def on_event(self, event, collection, name, item):
+ # These are events from the Collection object (ADD/DEL/MOD)
+ # emitted by operations on the Collection object (like
+ # "mkdirs" or "remove"), and by "update", which we need to
+ # synchronize with our FUSE objects that are assigned inodes.
if collection == self.collection:
name = self.sanitize_filename(name)
self.inodes.invalidate_inode(item.fuse_entry)
elif name in self._entries:
self.inodes.invalidate_inode(self._entries[name])
+
+ if self.collection_record_file is not None:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
finally:
while lockcount > 0:
self.collection.lock.acquire()
@use_counter
def flush(self):
- if not self.writable():
- return
- with llfuse.lock_released:
- self.collection.root_collection().save()
+ self.collection_root.flush()
@use_counter
@check_update
"""Represents the root of a directory tree representing a collection."""
def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None)
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
self.api = api
self.num_retries = num_retries
- self.collection_record_file = None
- self.collection_record = None
self._poll = True
try:
self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
def writable(self):
return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
+ @use_counter
+ def flush(self):
+ if not self.writable():
+ return
+ with llfuse.lock_released:
+ with self._updating_lock:
+ if self.collection.committed():
+ self.collection.update()
+ else:
+ self.collection.save()
+ self.new_collection_record(self.collection.api_response())
+
def want_event_subscribe(self):
return (uuid_pattern.match(self.collection_locator) is not None)
- # Used by arv-web.py to switch the contents of the CollectionDirectory
- def change_collection(self, new_locator):
- """Switch the contents of the CollectionDirectory.
-
- Must be called with llfuse.lock held.
- """
-
- self.collection_locator = new_locator
- self.collection_record = None
- self.update()
-
def new_collection(self, new_collection_record, coll_reader):
if self.inode:
self.clear()
-
- self.collection_record = new_collection_record
-
- if self.collection_record:
- self._mtime = convertTime(self.collection_record.get('modified_at'))
- self.collection_locator = self.collection_record["uuid"]
- if self.collection_record_file is not None:
- self.collection_record_file.update(self.collection_record)
-
self.collection = coll_reader
+ self.new_collection_record(new_collection_record)
self.populate(self.mtime())
+ def new_collection_record(self, new_collection_record):
+ if not new_collection_record:
+ raise Exception("invalid new_collection_record")
+ self._mtime = convertTime(new_collection_record.get('modified_at'))
+ self._manifest_size = len(new_collection_record["manifest_text"])
+ self.collection_locator = new_collection_record["uuid"]
+ if self.collection_record_file is not None:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
+ _logger.debug("%s invalidated collection record file", self)
+ self.fresh()
+
def uuid(self):
return self.collection_locator
@use_counter
- def update(self, to_record_version=None):
+ def update(self):
try:
- if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
+ if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
+ # It's immutable, nothing to update
return True
if self.collection_locator is None:
+ # No collection locator to retrieve from
self.fresh()
return True
+ new_collection_record = None
try:
with llfuse.lock_released:
self._updating_lock.acquire()
if not self.stale():
- return
+ return True
- _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
- new_collection_record = None
+ _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
+ coll_reader = None
if self.collection is not None:
- if self.collection.known_past_version(to_record_version):
- _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
- else:
- self.collection.update()
+ # Already have a collection object
+ self.collection.update()
+ new_collection_record = self.collection.api_response()
else:
+ # If there's too many prefetch threads and you
+ # max out the CPU, delivering data to the FUSE
+ # layer actually ends up being slower.
+ # Experimentally, capping 7 threads seems to
+ # be a sweet spot.
+ get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
+ # Create a new collection object
if uuid_pattern.match(self.collection_locator):
coll_reader = arvados.collection.Collection(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
else:
coll_reader = arvados.collection.CollectionReader(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
new_collection_record = coll_reader.api_response() or {}
# If the Collection only exists in Keep, there will be no API
# response. Fill in the fields we need.
new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
# end with llfuse.lock_released, re-acquire lock
- if (new_collection_record is not None and
- (self.collection_record is None or
- self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
- self.new_collection(new_collection_record, coll_reader)
- self._manifest_size = len(coll_reader.manifest_text())
- _logger.debug("%s manifest_size %i", self, self._manifest_size)
- self.fresh()
+ if new_collection_record is not None:
+ if coll_reader is not None:
+ self.new_collection(new_collection_record, coll_reader)
+ else:
+ self.new_collection_record(new_collection_record)
+
return True
finally:
self._updating_lock.release()
_logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
except arvados.errors.ArgumentError as detail:
_logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
- if self.collection_record is not None and "manifest_text" in self.collection_record:
- _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+ if new_collection_record is not None and "manifest_text" in new_collection_record:
+ _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
except Exception:
_logger.exception("arv-mount %s: error", self.collection_locator)
- if self.collection_record is not None and "manifest_text" in self.collection_record:
- _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+ if new_collection_record is not None and "manifest_text" in new_collection_record:
+ _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
self.invalidate()
return False
+ @use_counter
+ def collection_record(self):
+ self.flush()
+ return self.collection.api_response()
+
@use_counter
@check_update
def __getitem__(self, item):
if item == '.arvados#collection':
if self.collection_record_file is None:
- self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+ self.collection_record_file = FuncToJSONFile(
+ self.inode, self.collection_record)
self.inodes.add_entry(self.collection_record_file)
+ self.invalidate() # use lookup as a signal to force update
return self.collection_record_file
else:
return super(CollectionDirectory, self).__getitem__(item)
return super(CollectionDirectory, self).__contains__(k)
def invalidate(self):
- self.collection_record = None
- self.collection_record_file = None
+ if self.collection_record_file is not None:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
super(CollectionDirectory, self).invalidate()
def persisted(self):
# This is always enable_write=True because it never tries to
# save to the backend
super(TmpCollectionDirectory, self).__init__(
- parent_inode, inodes, api_client.config, True, collection)
- self.collection_record_file = None
+ parent_inode, inodes, api_client.config, True, collection, self)
self.populate(self.mtime())
def on_event(self, *args, **kwargs):
super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
- if self.collection_record_file:
+ if self.collection_record_file is None:
+ return
- # See discussion in CollectionDirectoryBase.on_event
- lockcount = 0
- try:
- while True:
- self.collection.lock.release()
- lockcount += 1
- except RuntimeError:
- pass
+ # See discussion in CollectionDirectoryBase.on_event
+ lockcount = 0
+ try:
+ while True:
+ self.collection.lock.release()
+ lockcount += 1
+ except RuntimeError:
+ pass
- try:
- with llfuse.lock:
- with self.collection.lock:
- self.collection_record_file.invalidate()
- self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record", self)
- finally:
- while lockcount > 0:
- self.collection.lock.acquire()
- lockcount -= 1
+ try:
+ with llfuse.lock:
+ with self.collection.lock:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
+ _logger.debug("%s invalidated collection record", self)
+ finally:
+ while lockcount > 0:
+ self.collection.lock.acquire()
+ lockcount -= 1
def collection_record(self):
with llfuse.lock_released:
def writable(self):
return True
+ def flush(self):
+ pass
+
def want_event_subscribe(self):
return False
uuid=self.project_uuid,
filters=[["uuid", "is_a", "arvados#group"],
["groups.group_class", "in", ["project","filter"]]]))
- contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
+ contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
+ arvados.util.keyset_list_all(self.api.groups().contents,
order_key="uuid",
num_retries=self.num_retries,
uuid=self.project_uuid,
- filters=[["uuid", "is_a", "arvados#collection"]]))
+ filters=[["uuid", "is_a", "arvados#collection"]])))
+
# end with llfuse.lock_released, re-acquire lock