# Match any character which FUSE or Linux cannot accommodate as part
# of a filename. (If present in a collection filename, they will
# appear as underscores in the fuse mount.)
-_disallowed_filename_characters = re.compile('[\x00/]')
+_disallowed_filename_characters = re.compile(r'[\x00/]')
class Directory(FreshBase):
and the value referencing a File or Directory object.
"""
- def __init__(self, parent_inode, inodes, apiconfig, enable_write):
+ def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters):
"""parent_inode is the integer inode number"""
super(Directory, self).__init__()
self._entries = {}
self._mtime = time.time()
self._enable_write = enable_write
+ self._filters = filters or []
+
+ def _filters_for(self, subtype, *, qualified):
+ for f in self._filters:
+ f_type, _, f_name = f[0].partition('.')
+ if not f_name:
+ yield f
+ elif f_type != subtype:
+ pass
+ elif qualified:
+ yield f
+ else:
+ yield [f_name, *f[1:]]
def forward_slash_subst(self):
if not hasattr(self, '_fsns'):
self.inodes.touch(self)
super(Directory, self).fresh()
+ def objsize(self):
+ return len(self._entries) * 64
+
def merge(self, items, fn, same, new_entry):
"""Helper method for updating the contents of the directory.
changed = False
for i in items:
name = self.sanitize_filename(fn(i))
- if name:
- if name in oldentries and same(oldentries[name], i):
+ if not name:
+ continue
+ if name in oldentries:
+ ent = oldentries[name]
+ ent.inc_use()
+ if same(ent, i):
# move existing directory entry over
- self._entries[name] = oldentries[name]
+ self._entries[name] = ent
del oldentries[name]
- else:
- _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
- # create new directory entry
- ent = new_entry(i)
- if ent is not None:
- self._entries[name] = self.inodes.add_entry(ent)
- changed = True
+
+ for i in items:
+ name = self.sanitize_filename(fn(i))
+ if not name:
+ continue
+ if name not in self._entries:
+ _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
+ # create new directory entry
+ ent = new_entry(i)
+ if ent is not None:
+ ent.inc_use()
+ self._entries[name] = self.inodes.add_entry(ent)
+ changed = True
# delete any other directory entries that were not in found in 'items'
for i in oldentries:
_logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
self.inodes.invalidate_entry(self, i)
self.inodes.del_entry(oldentries[i])
+ ent.dec_use()
changed = True
if changed:
self.inodes.invalidate_inode(self)
self._mtime = time.time()
+ self.inodes.inode_cache.update_cache_size(self)
+ self.inodes.inode_cache.cap_cache()
+
+ for ent in self._entries.values():
+ ent.dec_use()
self.fresh()
oldentries = self._entries
self._entries = {}
for n in oldentries:
- oldentries[n].clear()
self.inodes.del_entry(oldentries[n])
self.invalidate()
"""
- def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root):
- super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
+ def __init__(self, parent_inode, inodes, apiconfig, enable_write, filters, collection, collection_root):
+ super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write, filters)
self.apiconfig = apiconfig
self.collection = collection
self.collection_root = collection_root
item.fuse_entry.dead = False
self._entries[name] = item.fuse_entry
elif isinstance(item, arvados.collection.RichCollectionBase):
- self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root))
+ self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(
+ self.inode,
+ self.inodes,
+ self.apiconfig,
+ self._enable_write,
+ self._filters,
+ item,
+ self.collection_root,
+ ))
self._entries[name].populate(mtime)
else:
self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
- def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters=None, collection_record=None, explicit_collection=None):
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters, None, self)
self.api = api
self.num_retries = num_retries
self._poll = True
self.collection_record_file.invalidate()
self.inodes.invalidate_inode(self.collection_record_file)
_logger.debug("%s invalidated collection record file", self)
+ self.inodes.inode_cache.update_cache_size(self)
self.fresh()
def uuid(self):
self.collection.update()
new_collection_record = self.collection.api_response()
else:
- # experimentally, 4 block prefetch seems to be optimal.
- get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 4)
# Create a new collection object
if uuid_pattern.match(self.collection_locator):
coll_reader = arvados.collection.Collection(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries,
- get_threads=get_threads)
+ num_retries=self.num_retries)
else:
coll_reader = arvados.collection.CollectionReader(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries,
- get_threads=get_threads)
+ num_retries=self.num_retries)
new_collection_record = coll_reader.api_response() or {}
# If the Collection only exists in Keep, there will be no API
# response. Fill in the fields we need.
self.collection.stop_threads()
super(CollectionDirectory, self).clear()
self._manifest_size = 0
+ self.inodes.inode_cache.update_cache_size(self)
class TmpCollectionDirectory(CollectionDirectoryBase):
def save_new(self):
pass
- def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
+ def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, filters=None, storage_classes=None):
collection = self.UnsaveableCollection(
api_client=api_client,
keep_client=api_client.keep,
# This is always enable_write=True because it never tries to
# save to the backend
super(TmpCollectionDirectory, self).__init__(
- parent_inode, inodes, api_client.config, True, collection, self)
+ parent_inode, inodes, api_client.config, True, filters, collection, self)
self.populate(self.mtime())
def on_event(self, *args, **kwargs):
""".lstrip()
- def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
- super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, pdh_only=False, storage_classes=None):
+ super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
# If we're the root directory, add an identical by_id subdirectory.
if self.inode == llfuse.ROOT_INODE:
self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
- self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
- self.pdh_only))
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ self.pdh_only,
+ ))
def __contains__(self, k):
if k in self._entries:
if group_uuid_pattern.match(k):
project = self.api.groups().list(
- filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
+ filters=[
+ ['group_class', 'in', ['project','filter']],
+ ["uuid", "=", k],
+ *self._filters_for('groups', qualified=False),
+ ],
+ ).execute(num_retries=self.num_retries)
if project[u'items_available'] == 0:
return False
e = self.inodes.add_entry(ProjectDirectory(
- self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
- project[u'items'][0], storage_classes=self.storage_classes))
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ project[u'items'][0],
+ storage_classes=self.storage_classes,
+ ))
else:
e = self.inodes.add_entry(CollectionDirectory(
- self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ k,
+ ))
if e.update():
if k not in self._entries:
class TagsDirectory(Directory):
"""A special directory that contains as subdirectories all tags visible to the user."""
- def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, poll_time=60):
+ super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
self.api = api
self.num_retries = num_retries
self._poll = True
def update(self):
with llfuse.lock_released:
tags = self.api.links().list(
- filters=[['link_class', '=', 'tag'], ["name", "!=", ""]],
- select=['name'], distinct=True, limit=1000
- ).execute(num_retries=self.num_retries)
+ filters=[
+ ['link_class', '=', 'tag'],
+ ['name', '!=', ''],
+ *self._filters_for('links', qualified=False),
+ ],
+ select=['name'],
+ distinct=True,
+ limit=1000,
+ ).execute(num_retries=self.num_retries)
if "items" in tags:
- self.merge(tags['items']+[{"name": n} for n in self._extra],
- lambda i: i['name'],
- lambda a, i: a.tag == i['name'],
- lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
- i['name'], poll=self._poll, poll_time=self._poll_time))
+ self.merge(
+ tags['items']+[{"name": n} for n in self._extra],
+ lambda i: i['name'],
+ lambda a, i: a.tag == i['name'],
+ lambda i: TagDirectory(
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ i['name'],
+ poll=self._poll,
+ poll_time=self._poll_time,
+ ),
+ )
@use_counter
@check_update
return super(TagsDirectory, self).__getitem__(item)
with llfuse.lock_released:
tags = self.api.links().list(
- filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1
+ filters=[
+ ['link_class', '=', 'tag'],
+ ['name', '=', item],
+ *self._filters_for('links', qualified=False),
+ ],
+ limit=1,
).execute(num_retries=self.num_retries)
if tags["items"]:
self._extra.add(item)
to the user that are tagged with a particular tag.
"""
- def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
+ super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.tag = tag
def update(self):
with llfuse.lock_released:
taggedcollections = self.api.links().list(
- filters=[['link_class', '=', 'tag'],
- ['name', '=', self.tag],
- ['head_uuid', 'is_a', 'arvados#collection']],
- select=['head_uuid']
- ).execute(num_retries=self.num_retries)
- self.merge(taggedcollections['items'],
- lambda i: i['head_uuid'],
- lambda a, i: a.collection_locator == i['head_uuid'],
- lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
+ filters=[
+ ['link_class', '=', 'tag'],
+ ['name', '=', self.tag],
+ ['head_uuid', 'is_a', 'arvados#collection'],
+ *self._filters_for('links', qualified=False),
+ ],
+ select=['head_uuid'],
+ ).execute(num_retries=self.num_retries)
+ self.merge(
+ taggedcollections['items'],
+ lambda i: i['head_uuid'],
+ lambda a, i: a.collection_locator == i['head_uuid'],
+ lambda i: CollectionDirectory(
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ i['head_uuid'],
+ ),
+ )
class ProjectDirectory(Directory):
"""A special directory that contains the contents of a project."""
- def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
- poll=True, poll_time=3, storage_classes=None):
- super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
+ project_object, poll=True, poll_time=3, storage_classes=None):
+ super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
return True
def createDirectory(self, i):
+ common_args = (self.inode, self.inodes, self.api, self.num_retries, self._enable_write, self._filters)
if collection_uuid_pattern.match(i['uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
+ return CollectionDirectory(*common_args, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
- i, self._poll, self._poll_time, self.storage_classes)
+ return ProjectDirectory(*common_args, i, self._poll, self._poll_time, self.storage_classes)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
+ return CollectionDirectory(*common_args, i['head_uuid'])
else:
return None
elif uuid_pattern.match(i['uuid']):
try:
with llfuse.lock_released:
+ _logger.debug("Getting lock to update %s", self.project_uuid)
self._updating_lock.acquire()
if not self.stale():
+ _logger.debug("%s was updated already", self.project_uuid)
return
+ _logger.debug("Requesting update of %s", self.project_uuid)
+
if group_uuid_pattern.match(self.project_uuid):
self.project_object = self.api.groups().get(
uuid=self.project_uuid).execute(num_retries=self.num_retries)
self.project_object = self.api.users().get(
uuid=self.project_uuid).execute(num_retries=self.num_retries)
# do this in 2 steps until #17424 is fixed
- contents = list(arvados.util.keyset_list_all(self.api.groups().contents,
- order_key="uuid",
- num_retries=self.num_retries,
- uuid=self.project_uuid,
- filters=[["uuid", "is_a", "arvados#group"],
- ["groups.group_class", "in", ["project","filter"]]]))
- contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"],
- arvados.util.keyset_list_all(self.api.groups().contents,
- order_key="uuid",
- num_retries=self.num_retries,
- uuid=self.project_uuid,
- filters=[["uuid", "is_a", "arvados#collection"]])))
-
-
+ contents = list(arvados.util.keyset_list_all(
+ self.api.groups().contents,
+ order_key='uuid',
+ num_retries=self.num_retries,
+ uuid=self.project_uuid,
+ filters=[
+ ['uuid', 'is_a', 'arvados#group'],
+ ['groups.group_class', 'in', ['project', 'filter']],
+ *self._filters_for('groups', qualified=True),
+ ],
+ ))
+ contents.extend(obj for obj in arvados.util.keyset_list_all(
+ self.api.groups().contents,
+ order_key='uuid',
+ num_retries=self.num_retries,
+ uuid=self.project_uuid,
+ filters=[
+ ['uuid', 'is_a', 'arvados#collection'],
+ *self._filters_for('collections', qualified=True),
+ ],
+ ) if obj['current_version_uuid'] == obj['uuid'])
# end with llfuse.lock_released, re-acquire lock
self.merge(contents,
namefilter = ["name", "=", k]
else:
namefilter = ["name", "in", [k, k2]]
- contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid],
- ["group_class", "in", ["project","filter"]],
- namefilter],
- limit=2).execute(num_retries=self.num_retries)["items"]
+ contents = self.api.groups().list(
+ filters=[
+ ["owner_uuid", "=", self.project_uuid],
+ ["group_class", "in", ["project","filter"]],
+ namefilter,
+ *self._filters_for('groups', qualified=False),
+ ],
+ limit=2,
+ ).execute(num_retries=self.num_retries)["items"]
if not contents:
- contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid],
- namefilter],
- limit=2).execute(num_retries=self.num_retries)["items"]
+ contents = self.api.collections().list(
+ filters=[
+ ["owner_uuid", "=", self.project_uuid],
+ namefilter,
+ *self._filters_for('collections', qualified=False),
+ ],
+ limit=2,
+ ).execute(num_retries=self.num_retries)["items"]
if contents:
if len(contents) > 1 and contents[1]['name'] == k:
# If "foo/bar" and "foo[SUBST]bar" both exist, use
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
- def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
- poll=False, poll_time=60, storage_classes=None):
- super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, filters,
+ exclude, poll=False, poll_time=60, storage_classes=None):
+ super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, filters)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)
if 'httpMethod' in methods.get('shared', {}):
page = []
while True:
- resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page,
- order="uuid",
- limit=10000,
- count="none",
- include="owner_uuid").execute()
+ resp = self.api.groups().shared(
+ filters=[
+ ['group_class', 'in', ['project','filter']],
+ *page,
+ *self._filters_for('groups', qualified=False),
+ ],
+ order="uuid",
+ limit=10000,
+ count="none",
+ include="owner_uuid",
+ ).execute()
if not resp["items"]:
break
page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]]
self.api.groups().list,
order_key="uuid",
num_retries=self.num_retries,
- filters=[['group_class','in',['project','filter']]],
- select=["uuid", "owner_uuid"]))
+ filters=[
+ ['group_class', 'in', ['project','filter']],
+ *self._filters_for('groups', qualified=False),
+ ],
+ select=["uuid", "owner_uuid"],
+ ))
for ob in all_projects:
objects[ob['uuid']] = ob
self.api.users().list,
order_key="uuid",
num_retries=self.num_retries,
- filters=[['uuid','in', list(root_owners)]])
+ filters=[
+ ['uuid', 'in', list(root_owners)],
+ *self._filters_for('users', qualified=False),
+ ],
+ )
lgroups = arvados.util.keyset_list_all(
self.api.groups().list,
order_key="uuid",
num_retries=self.num_retries,
- filters=[['uuid','in', list(root_owners)+roots]])
-
+ filters=[
+ ['uuid', 'in', list(root_owners)+roots],
+ *self._filters_for('groups', qualified=False),
+ ],
+ )
for l in lusers:
objects[l["uuid"]] = l
for l in lgroups:
# end with llfuse.lock_released, re-acquire lock
- self.merge(contents.items(),
- lambda i: i[0],
- lambda a, i: a.uuid() == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
- i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
+ self.merge(
+ contents.items(),
+ lambda i: i[0],
+ lambda a, i: a.uuid() == i[1]['uuid'],
+ lambda i: ProjectDirectory(
+ self.inode,
+ self.inodes,
+ self.api,
+ self.num_retries,
+ self._enable_write,
+ self._filters,
+ i[1],
+ poll=self._poll,
+ poll_time=self._poll_time,
+ storage_classes=self.storage_classes,
+ ),
+ )
except Exception:
_logger.exception("arv-mount shared dir error")
finally: