# Match any character which FUSE or Linux cannot accommodate as part
# of a filename. (If present in a collection filename, they will
# appear as underscores in the fuse mount.)
-_disallowed_filename_characters = re.compile('[\x00/]')
+_disallowed_filename_characters = re.compile(r'[\x00/]')
class Directory(FreshBase):
and the value referencing a File or Directory object.
"""
- def __init__(self, parent_inode, inodes, apiconfig):
+ __slots__ = ("inode", "parent_inode", "inodes", "_entries", "_mtime", "_enable_write", "_filters")
+
+ def __init__(self, parent_inode, inodes, enable_write, filters):
"""parent_inode is the integer inode number"""
super(Directory, self).__init__()
raise Exception("parent_inode should be an int")
self.parent_inode = parent_inode
self.inodes = inodes
- self.apiconfig = apiconfig
self._entries = {}
self._mtime = time.time()
-
- def forward_slash_subst(self):
- if not hasattr(self, '_fsns'):
- self._fsns = None
- config = self.apiconfig()
- try:
- self._fsns = config["Collections"]["ForwardSlashNameSubstitution"]
- except KeyError:
- # old API server with no FSNS config
- self._fsns = '_'
+ self._enable_write = enable_write
+ self._filters = filters or []
+
+ def _filters_for(self, subtype, *, qualified):
+ for f in self._filters:
+ f_type, _, f_name = f[0].partition('.')
+ if not f_name:
+ yield f
+ elif f_type != subtype:
+ pass
+ elif qualified:
+ yield f
else:
- if self._fsns == '' or self._fsns == '/':
- self._fsns = None
- return self._fsns
+ yield [f_name, *f[1:]]
def unsanitize_filename(self, incoming):
"""Replace ForwardSlashNameSubstitution value with /"""
- fsns = self.forward_slash_subst()
+ fsns = self.inodes.forward_slash_subst()
if isinstance(fsns, str):
return incoming.replace(fsns, '/')
else:
elif dirty == '..':
return '__'
else:
- fsns = self.forward_slash_subst()
+ fsns = self.inodes.forward_slash_subst()
if isinstance(fsns, str):
dirty = dirty.replace('/', fsns)
return _disallowed_filename_characters.sub('_', dirty)
self.inodes.touch(self)
super(Directory, self).fresh()
+ def objsize(self):
+ # Rough estimate of memory footprint based on using pympler
+ return len(self._entries) * 1024
+
def merge(self, items, fn, same, new_entry):
"""Helper method for updating the contents of the directory.
entries that are the same in both the old and new lists, create new
entries, and delete old entries missing from the new list.
- :items: iterable with new directory contents
+ Arguments:
+ * items: Iterable --- New directory contents
- :fn: function to take an entry in 'items' and return the desired file or
+ * fn: Callable --- Takes an entry in 'items' and return the desired file or
directory name, or None if this entry should be skipped
- :same: function to compare an existing entry (a File or Directory
+ * same: Callable --- Compare an existing entry (a File or Directory
object) with an entry in the items list to determine whether to keep
the existing entry.
- :new_entry: function to create a new directory entry (File or Directory
+ * new_entry: Callable --- Create a new directory entry (File or Directory
object) from an entry in the items list.
"""
changed = False
for i in items:
name = self.sanitize_filename(fn(i))
- if name:
- if name in oldentries and same(oldentries[name], i):
+ if not name:
+ continue
+ if name in oldentries:
+ ent = oldentries[name]
+ if same(ent, i) and ent.parent_inode == self.inode:
# move existing directory entry over
- self._entries[name] = oldentries[name]
+ self._entries[name] = ent
del oldentries[name]
- else:
- _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
- # create new directory entry
- ent = new_entry(i)
- if ent is not None:
- self._entries[name] = self.inodes.add_entry(ent)
- changed = True
+ self.inodes.inode_cache.touch(ent)
+
+ for i in items:
+ name = self.sanitize_filename(fn(i))
+ if not name:
+ continue
+ if name not in self._entries:
+ # create new directory entry
+ ent = new_entry(i)
+ if ent is not None:
+ self._entries[name] = self.inodes.add_entry(ent)
+ # need to invalidate this just in case there was a
+ # previous entry that couldn't be moved over or a
+ # lookup that returned file not found and cached
+ # a negative result
+ self.inodes.invalidate_entry(self, name)
+ changed = True
+ _logger.debug("Added entry '%s' as inode %i to parent inode %i", name, ent.inode, self.inode)
# delete any other directory entries that were not in found in 'items'
- for i in oldentries:
- _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
- self.inodes.invalidate_entry(self, i)
- self.inodes.del_entry(oldentries[i])
+ for name, ent in oldentries.items():
+ _logger.debug("Detaching entry '%s' from parent_inode %i", name, self.inode)
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
changed = True
if changed:
- self.inodes.invalidate_inode(self)
self._mtime = time.time()
+ self.inodes.inode_cache.update_cache_size(self)
self.fresh()
return True
return False
- def has_ref(self, only_children):
- if super(Directory, self).has_ref(only_children):
- return True
- for v in self._entries.values():
- if v.has_ref(False):
- return True
- return False
-
def clear(self):
"""Delete all entries"""
+ if not self._entries:
+ return
oldentries = self._entries
self._entries = {}
- for n in oldentries:
- oldentries[n].clear()
- self.inodes.del_entry(oldentries[n])
self.invalidate()
+ for name, ent in oldentries.items():
+ ent.clear()
+ self.inodes.invalidate_entry(self, name)
+ self.inodes.del_entry(ent)
+ self.inodes.inode_cache.update_cache_size(self)
def kernel_invalidate(self):
# Invalidating the dentry on the parent implies invalidating all paths
# below it as well.
- parent = self.inodes[self.parent_inode]
+ if self.parent_inode in self.inodes:
+ parent = self.inodes[self.parent_inode]
+ else:
+ # parent was removed already.
+ return
# Find self on the parent in order to invalidate this path.
# Calling the public items() method might trigger a refresh,
"""
- def __init__(self, parent_inode, inodes, apiconfig, collection):
- super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
- self.apiconfig = apiconfig
+ __slots__ = ("collection", "collection_root", "collection_record_file")
+
+ def __init__(self, parent_inode, inodes, enable_write, filters, collection, collection_root):
+ super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, enable_write, filters)
self.collection = collection
+ self.collection_root = collection_root
+ self.collection_record_file = None
def new_entry(self, name, item, mtime):
name = self.sanitize_filename(name)
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
- if item.fuse_entry.dead is not True:
- raise Exception("Can only reparent dead inode entry")
+ if item.fuse_entry.parent_inode is not None:
+ raise Exception("Can only reparent unparented inode entry")
if item.fuse_entry.inode is None:
raise Exception("Reparented entry must still have valid inode")
- item.fuse_entry.dead = False
+ item.fuse_entry.parent_inode = self.inode
self._entries[name] = item.fuse_entry
elif isinstance(item, arvados.collection.RichCollectionBase):
- self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
+ self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
+ self.inode,
+ self.inodes,
+ self._enable_write,
+ self._filters,
+ item,
+ self.collection_root,
+ ))
self._entries[name].populate(mtime)
else:
- self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
+ self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
item.fuse_entry = self._entries[name]
def on_event(self, event, collection, name, item):
+ # These are events from the Collection object (ADD/DEL/MOD)
+ # emitted by operations on the Collection object (like
+ # "mkdirs" or "remove"), and by "update", which we need to
+ # synchronize with our FUSE objects that are assigned inodes.
if collection == self.collection:
name = self.sanitize_filename(name)
self.inodes.invalidate_inode(item.fuse_entry)
elif name in self._entries:
self.inodes.invalidate_inode(self._entries[name])
+
+ if self.collection_record_file is not None:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
finally:
while lockcount > 0:
self.collection.lock.acquire()
self.new_entry(entry, item, self.mtime())
def writable(self):
- return self.collection.writable()
+ return self._enable_write and self.collection.writable()
@use_counter
def flush(self):
- with llfuse.lock_released:
- self.collection.root_collection().save()
+ self.collection_root.flush()
@use_counter
@check_update
def create(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.open(name, "w").close()
@use_counter
@check_update
def mkdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.mkdirs(name)
@use_counter
@check_update
def unlink(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.remove(name)
self.flush()
@use_counter
@check_update
def rmdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.remove(name)
self.flush()
@use_counter
@check_update
def rename(self, name_old, name_new, src):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
if not isinstance(src, CollectionDirectoryBase):
raise llfuse.FUSEError(errno.EPERM)
def clear(self):
super(CollectionDirectoryBase, self).clear()
+ if self.collection is not None:
+ self.collection.unsubscribe()
self.collection = None
+ def objsize(self):
+ # objsize for the whole collection is represented at the root,
+ # don't double-count it
+ return 0
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
- def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
+ __slots__ = ("api", "num_retries", "collection_locator",
+ "_manifest_size", "_writable", "_updating_lock")
+
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, enable_write, filters, None, self)
self.api = api
self.num_retries = num_retries
- self.collection_record_file = None
- self.collection_record = None
self._poll = True
try:
self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2)
self._mtime = 0
self._manifest_size = 0
if self.collection_locator:
- self._writable = (uuid_pattern.match(self.collection_locator) is not None)
+ self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
self._updating_lock = threading.Lock()
def same(self, i):
return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
def writable(self):
- return self.collection.writable() if self.collection is not None else self._writable
+ return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
+
+ @use_counter
+ def flush(self):
+ if not self.writable():
+ return
+ with llfuse.lock_released:
+ with self._updating_lock:
+ if self.collection.committed():
+ self.collection.update()
+ else:
+ self.collection.save()
+ self.new_collection_record(self.collection.api_response())
def want_event_subscribe(self):
return (uuid_pattern.match(self.collection_locator) is not None)
- # Used by arv-web.py to switch the contents of the CollectionDirectory
- def change_collection(self, new_locator):
- """Switch the contents of the CollectionDirectory.
-
- Must be called with llfuse.lock held.
- """
-
- self.collection_locator = new_locator
- self.collection_record = None
- self.update()
-
def new_collection(self, new_collection_record, coll_reader):
if self.inode:
self.clear()
-
- self.collection_record = new_collection_record
-
- if self.collection_record:
- self._mtime = convertTime(self.collection_record.get('modified_at'))
- self.collection_locator = self.collection_record["uuid"]
- if self.collection_record_file is not None:
- self.collection_record_file.update(self.collection_record)
-
self.collection = coll_reader
+ self.new_collection_record(new_collection_record)
self.populate(self.mtime())
+ def new_collection_record(self, new_collection_record):
+ if not new_collection_record:
+ raise Exception("invalid new_collection_record")
+ self._mtime = convertTime(new_collection_record.get('modified_at'))
+ self._manifest_size = len(new_collection_record["manifest_text"])
+ self.collection_locator = new_collection_record["uuid"]
+ if self.collection_record_file is not None:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
+ _logger.debug("parent_inode %s invalidated collection record file inode %s", self.inode,
+ self.collection_record_file.inode)
+ self.inodes.update_uuid(self)
+ self.inodes.inode_cache.update_cache_size(self)
+ self.fresh()
+
def uuid(self):
return self.collection_locator
@use_counter
- def update(self, to_record_version=None):
+ def update(self):
try:
- if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
+ if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator):
+ # It's immutable, nothing to update
return True
if self.collection_locator is None:
+ # No collection locator to retrieve from
self.fresh()
return True
+ new_collection_record = None
try:
with llfuse.lock_released:
self._updating_lock.acquire()
if not self.stale():
- return
+ return True
- _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version)
- new_collection_record = None
+ _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode)
+ coll_reader = None
if self.collection is not None:
- if self.collection.known_past_version(to_record_version):
- _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
- else:
- self.collection.update()
+ # Already have a collection object
+ self.collection.update()
+ new_collection_record = self.collection.api_response()
else:
+ # Create a new collection object
if uuid_pattern.match(self.collection_locator):
coll_reader = arvados.collection.Collection(
self.collection_locator, self.api, self.api.keep,
new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired()
# end with llfuse.lock_released, re-acquire lock
- if (new_collection_record is not None and
- (self.collection_record is None or
- self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"))):
- self.new_collection(new_collection_record, coll_reader)
- self._manifest_size = len(coll_reader.manifest_text())
- _logger.debug("%s manifest_size %i", self, self._manifest_size)
- self.fresh()
+ if new_collection_record is not None:
+ if coll_reader is not None:
+ self.new_collection(new_collection_record, coll_reader)
+ else:
+ self.new_collection_record(new_collection_record)
+
return True
finally:
self._updating_lock.release()
_logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
except arvados.errors.ArgumentError as detail:
_logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
- if self.collection_record is not None and "manifest_text" in self.collection_record:
- _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+ if new_collection_record is not None and "manifest_text" in new_collection_record:
+ _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
except Exception:
_logger.exception("arv-mount %s: error", self.collection_locator)
- if self.collection_record is not None and "manifest_text" in self.collection_record:
- _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
+ if new_collection_record is not None and "manifest_text" in new_collection_record:
+ _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"])
self.invalidate()
return False
+ @use_counter
+ @check_update
+ def collection_record(self):
+ self.flush()
+ return self.collection.api_response()
+
@use_counter
@check_update
def __getitem__(self, item):
if item == '.arvados#collection':
if self.collection_record_file is None:
- self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+ self.collection_record_file = FuncToJSONFile(
+ self.inode, self.collection_record)
self.inodes.add_entry(self.collection_record_file)
+ self.invalidate() # use lookup as a signal to force update
return self.collection_record_file
else:
return super(CollectionDirectory, self).__getitem__(item)
return super(CollectionDirectory, self).__contains__(k)
def invalidate(self):
- self.collection_record = None
- self.collection_record_file = None
+ if self.collection_record_file is not None:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
super(CollectionDirectory, self).invalidate()
def persisted(self):
return (self.collection_locator is not None)
def objsize(self):
- # This is an empirically-derived heuristic to estimate the memory used
- # to store this collection's metadata. Calculating the memory
- # footprint directly would be more accurate, but also more complicated.
- return self._manifest_size * 128
+ # This is a rough guess of the amount of overhead involved for
+ # a collection; the assumptions are that that each file
+ # averages 128 bytes in the manifest, but consume 1024 bytes
+ # of Python data structures, so 1024/128=8 means we estimate
+ # the RAM footprint at 8 times the size of bare manifest text.
+ return self._manifest_size * 8
def finalize(self):
- if self.collection is not None:
- if self.writable():
+ if self.collection is None:
+ return
+
+ if self.writable():
+ try:
self.collection.save()
- self.collection.stop_threads()
+ except Exception as e:
+ _logger.exception("Failed to save collection %s", self.collection_locator)
+ self.collection.stop_threads()
def clear(self):
if self.collection is not None:
self.collection.stop_threads()
- super(CollectionDirectory, self).clear()
self._manifest_size = 0
+ super(CollectionDirectory, self).clear()
+ if self.collection_record_file is not None:
+ self.inodes.del_entry(self.collection_record_file)
+ self.collection_record_file = None
class TmpCollectionDirectory(CollectionDirectoryBase):
def save_new(self):
pass
- def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
+ def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
collection = self.UnsaveableCollection(
api_client=api_client,
keep_client=api_client.keep,
num_retries=num_retries,
storage_classes_desired=storage_classes)
+ # This is always enable_write=True because it never tries to
+ # save to the backend
super(TmpCollectionDirectory, self).__init__(
- parent_inode, inodes, api_client.config, collection)
- self.collection_record_file = None
+ parent_inode, inodes, True, filters, collection, self)
self.populate(self.mtime())
def on_event(self, *args, **kwargs):
super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
- if self.collection_record_file:
+ if self.collection_record_file is None:
+ return
- # See discussion in CollectionDirectoryBase.on_event
- lockcount = 0
- try:
- while True:
- self.collection.lock.release()
- lockcount += 1
- except RuntimeError:
- pass
+ # See discussion in CollectionDirectoryBase.on_event
+ lockcount = 0
+ try:
+ while True:
+ self.collection.lock.release()
+ lockcount += 1
+ except RuntimeError:
+ pass
- try:
- with llfuse.lock:
- with self.collection.lock:
- self.collection_record_file.invalidate()
- self.inodes.invalidate_inode(self.collection_record_file)
- _logger.debug("%s invalidated collection record", self)
- finally:
- while lockcount > 0:
- self.collection.lock.acquire()
- lockcount -= 1
+ try:
+ with llfuse.lock:
+ with self.collection.lock:
+ self.collection_record_file.invalidate()
+ self.inodes.invalidate_inode(self.collection_record_file)
+ _logger.debug("%s invalidated collection record", self.inode)
+ finally:
+ while lockcount > 0:
+ self.collection.lock.acquire()
+ lockcount -= 1
def collection_record(self):
with llfuse.lock_released:
def writable(self):
return True
+ def flush(self):
+ pass
+
def want_event_subscribe(self):
return False
""".lstrip()
- def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
- super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
+ super(MagicDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
# If we're the root directory, add an identical by_id subdirectory.
if self.inode == llfuse.ROOT_INODE:
self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
- self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ self.pdh_only,
+ ))
def __contains__(self, k):
if k in self._entries:
if group_uuid_pattern.match(k):
project = self.api.groups().list(
- filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
+ filters=[
+ ['group_class', 'in', ['project','filter']],
+ ["uuid", "=", k],
+ *self._filters_for('groups', qualified=False),
+ ],
+ ).execute(num_retries=self.num_retries)
if project[u'items_available'] == 0:
return False
e = self.inodes.add_entry(ProjectDirectory(
- self.inode, self.inodes, self.api, self.num_retries,
- project[u'items'][0], storage_classes=self.storage_classes))
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ project[u'items'][0],
+ storage_classes=self.storage_classes,
+ ))
else:
e = self.inodes.add_entry(CollectionDirectory(
- self.inode, self.inodes, self.api, self.num_retries, k))
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ k,
+ ))
if e.update():
if k not in self._entries:
class TagsDirectory(Directory):
"""A special directory that contains as subdirectories all tags visible to the user."""
- def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
+ super(TagsDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self._poll = True
def update(self):
with llfuse.lock_released:
tags = self.api.links().list(
- filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
- select=['name'], distinct=True, limit=1000
- ).execute(num_retries=self.num_retries)
+ filters=[
+ ['link_class', '=', 'tag'],
+ ['name', '!=', ''],
+ *self._filters_for('links', qualified=False),
+ ],
+ select=['name'],
+ distinct=True,
+ limit=1000,
+ ).execute(num_retries=self.num_retries)
if "items" in tags:
- self.merge(tags['items']+[{"name": n} for n in self._extra],
- lambda i: i['name'],
- lambda a, i: a.tag == i['name'],
- lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
+ self.merge(
+ tags['items']+[{"name": n} for n in self._extra],
+ lambda i: i['name'],
+ lambda a, i: a.tag == i['name'],
+ lambda i: TagDirectory(
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ i['name'],
+ poll=self._poll,
+ poll_time=self._poll_time,
+ ),
+ )
@use_counter
@check_update
return super(TagsDirectory, self).__getitem__(item)
with llfuse.lock_released:
tags = self.api.links().list(
- filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
+ filters=[
+ ['link_class', '=', 'tag'],
+ ['name', '=', item],
+ *self._filters_for('links', qualified=False),
+ ],
+ limit=1,
).execute(num_retries=self.num_retries)
if tags["items"]:
self._extra.add(item)
to the user that are tagged with a particular tag.
"""
- def __init__(self, parent_inode, inodes, api, num_retries, tag,
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
+ super(TagDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.tag = tag
def update(self):
with llfuse.lock_released:
taggedcollections = self.api.links().list(
- filters=[['link_class', '=', 'tag'],
- ['name', '=', self.tag],
- ['head_uuid', 'is_a', 'arvados#collection']],
- select=['head_uuid']
- ).execute(num_retries=self.num_retries)
- self.merge(taggedcollections['items'],
- lambda i: i['head_uuid'],
- lambda a, i: a.collection_locator == i['head_uuid'],
- lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
+ filters=[
+ ['link_class', '=', 'tag'],
+ ['name', '=', self.tag],
+ ['head_uuid', 'is_a', 'arvados#collection'],
+ *self._filters_for('links', qualified=False),
+ ],
+ select=['head_uuid'],
+ ).execute(num_retries=self.num_retries)
+ self.merge(
+ taggedcollections['items'],
+ lambda i: i['head_uuid'],
+ lambda a, i: a.collection_locator == i['head_uuid'],
+ lambda i: CollectionDirectory(
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ i['head_uuid'],
+ ),
+ )
class ProjectDirectory(Directory):
"""A special directory that contains the contents of a project."""
- def __init__(self, parent_inode, inodes, api, num_retries, project_object,
- poll=True, poll_time=3, storage_classes=None):
- super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
+ __slots__ = ("api", "num_retries", "project_object", "project_object_file",
+ "project_uuid", "_updating_lock",
+ "_current_user", "_full_listing", "storage_classes", "recursively_contained")
+
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
+ project_object, poll=True, poll_time=3, storage_classes=None):
+ super(ProjectDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
self._current_user = None
self._full_listing = False
self.storage_classes = storage_classes
+ self.recursively_contained = False
+
+ # Filter groups can contain themselves, which causes tools
+ # that walk the filesystem to get stuck in an infinite loop,
+ # so suppress returning a listing in that case.
+ if self.project_object.get("group_class") == "filter":
+ iter_parent_inode = parent_inode
+ while iter_parent_inode != llfuse.ROOT_INODE:
+ parent_dir = self.inodes[iter_parent_inode]
+ if isinstance(parent_dir, ProjectDirectory) and parent_dir.project_uuid == self.project_uuid:
+ self.recursively_contained = True
+ break
+ iter_parent_inode = parent_dir.parent_inode
def want_event_subscribe(self):
return True
def createDirectory(self, i):
+ common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
if collection_uuid_pattern.match(i['uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
+ return CollectionDirectory(*common_args, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
+ return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
+ return CollectionDirectory(*common_args, i['head_uuid'])
else:
return None
elif uuid_pattern.match(i['uuid']):
self.project_object_file = ObjectFile(self.inode, self.project_object)
self.inodes.add_entry(self.project_object_file)
- if not self._full_listing:
+ if self.recursively_contained or not self._full_listing:
return True
def samefn(a, i):
self.project_object = self.api.users().get(
uuid=self.project_uuid).execute(num_retries=self.num_retries)
# do this in 2 steps until #17424 is fixed
- contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
- order_key="uuid",
- num_retries=self.num_retries,
- uuid=self.project_uuid,
- filters=[["uuid", "is_a", "arvados#group"],
- ["groups.group_class", "in", ["project","filter"]]]))
- contents.extend(arvados.util.keyset_list_all(self.api.groups().contents,
- order_key="uuid",
- num_retries=self.num_retries,
- uuid=self.project_uuid,
- filters=[["uuid", "is_a", "arvados#collection"]]))
-
+ contents = list(arvados.util.keyset_list_all(
+ self.api.groups().contents,
+ order_key='uuid',
+ num_retries=self.num_retries,
+ uuid=self.project_uuid,
+ filters=[
+ ['uuid', 'is_a', 'arvados#group'],
+ ['groups.group_class', 'in', ['project', 'filter']],
+ *self._filters_for('groups', qualified=True),
+ ],
+ ))
+ contents.extend(obj for obj in arvados.util.keyset_list_all(
+ self.api.groups().contents,
+ order_key='uuid',
+ num_retries=self.num_retries,
+ uuid=self.project_uuid,
+ filters=[
+ ['uuid', 'is_a', 'arvados#collection'],
+ *self._filters_for('collections', qualified=True),
+ ],
+ ) if obj['current_version_uuid'] == obj['uuid'])
# end with llfuse.lock_released, re-acquire lock
self.merge(contents,
namefilter = ["name", "=", k]
else:
namefilter = ["name", "in", [k, k2]]
- contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
- ["group_class", "in", ["project","filter"]],
- namefilter],
- limit=2).execute(num_retries=self.num_retries)["items"]
+ contents = self.api.groups().list(
+ filters=[
+ ["owner_uuid", "=", self.project_uuid],
+ ["group_class", "in", ["project","filter"]],
+ namefilter,
+ *self._filters_for('groups', qualified=False),
+ ],
+ limit=2,
+ ).execute(num_retries=self.num_retries)["items"]
if not contents:
- contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
- namefilter],
- limit=2).execute(num_retries=self.num_retries)["items"]
+ contents = self.api.collections().list(
+ filters=[
+ ["owner_uuid", "=", self.project_uuid],
+ namefilter,
+ *self._filters_for('collections', qualified=False),
+ ],
+ limit=2,
+ ).execute(num_retries=self.num_retries)["items"]
if contents:
if len(contents) > 1 and contents[1]['name'] == k:
# If "foo/bar" and "foo[SUBST]bar" both exist, use
@use_counter
@check_update
def writable(self):
+ if not self._enable_write:
+ return False
with llfuse.lock_released:
if not self._current_user:
self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
def persisted(self):
return True
+ def clear(self):
+ super(ProjectDirectory, self).clear()
+ if self.project_object_file is not None:
+ self.inodes.del_entry(self.project_object_file)
+ self.project_object_file = None
+
@use_counter
@check_update
def mkdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
try:
with llfuse.lock_released:
c = {
@use_counter
@check_update
def rmdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
if name not in self:
raise llfuse.FUSEError(errno.ENOENT)
if not isinstance(self[name], CollectionDirectory):
@use_counter
@check_update
def rename(self, name_old, name_new, src):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
if not isinstance(src, ProjectDirectory):
raise llfuse.FUSEError(errno.EPERM)
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
- def __init__(self, parent_inode, inodes, api, num_retries, exclude,
- poll=False, poll_time=60, storage_classes=None):
- super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
+ exclude, poll=False, poll_time=60, storage_classes=None):
+ super(SharedDirectory, self).__init__(parent_inode, inodes, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)
if 'httpMethod' in methods.get('shared', {}):
page = []
while True:
- resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
- order="uuid",
- limit=10000,
- count="none",
- include="owner_uuid").execute()
+ resp = self.api.groups().shared(
+ filters=[
+ ['group_class', 'in', ['project','filter']],
+ *page,
+ *self._filters_for('groups', qualified=False),
+ ],
+ order="uuid",
+ limit=10000,
+ count="none",
+ include="owner_uuid",
+ ).execute()
if not resp["items"]:
break
page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
self.api.groups().list,
order_key="uuid",
num_retries=self.num_retries,
- filters=[['group_class','in',['project','filter']]],
- select=["uuid", "owner_uuid"]))
+ filters=[
+ ['group_class', 'in', ['project','filter']],
+ *self._filters_for('groups', qualified=False),
+ ],
+ select=["uuid", "owner_uuid"],
+ ))
for ob in all_projects:
objects[ob['uuid']] = ob
self.api.users().list,
order_key="uuid",
num_retries=self.num_retries,
- filters=[['uuid','in', list(root_owners)]])
+ filters=[
+ ['uuid', 'in', list(root_owners)],
+ *self._filters_for('users', qualified=False),
+ ],
+ )
lgroups = arvados.util.keyset_list_all(
self.api.groups().list,
order_key="uuid",
num_retries=self.num_retries,
- filters=[['uuid','in', list(root_owners)+roots]])
-
+ filters=[
+ ['uuid', 'in', list(root_owners)+roots],
+ *self._filters_for('groups', qualified=False),
+ ],
+ )
for l in lusers:
objects[l["uuid"]] = l
for l in lgroups:
# end with llfuse.lock_released, re-acquire lock
- self.merge(contents.items(),
- lambda i: i[0],
- lambda a, i: a.uuid() == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
+ self.merge(
+ contents.items(),
+ lambda i: i[0],
+ lambda a, i: a.uuid() == i[1]['uuid'],
+ lambda i: ProjectDirectory(
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ i[1],
+ poll=self._poll,
+ poll_time=self._poll_time,
+ storage_classes=self.storage_classes,
+ ),
+ )
except Exception:
_logger.exception("arv-mount shared dir error")
finally: