X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0561bd0c3c07257fd58ded6c7cfa5feeae97af57..cdf63c1c50a044fe66f224b5cf36b632ecff12a5:/services/fuse/arvados_fuse/fusedir.py?ds=sidebyside diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index 0361ffeab6..7de95a0cb1 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -2,20 +2,20 @@ # # SPDX-License-Identifier: AGPL-3.0 -import logging -import re -import time -import llfuse -import arvados import apiclient +import arvados +import errno import functools +import llfuse +import logging +import re +import sys import threading -from apiclient import errors as apiclient_errors -import errno import time +from apiclient import errors as apiclient_errors -from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile -from fresh import FreshBase, convertTime, use_counter, check_update +from .fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile +from .fresh import FreshBase, convertTime, use_counter, check_update import arvados.collection from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern @@ -28,20 +28,6 @@ _logger = logging.getLogger('arvados.arvados_fuse') # appear as underscores in the fuse mount.) _disallowed_filename_characters = re.compile('[\x00/]') -# '.' and '..' are not reachable if API server is newer than #6277 -def sanitize_filename(dirty): - """Replace disallowed filename characters with harmless "_".""" - if dirty is None: - return None - elif dirty == '': - return '_' - elif dirty == '.': - return '_' - elif dirty == '..': - return '__' - else: - return _disallowed_filename_characters.sub('_', dirty) - class Directory(FreshBase): """Generic directory object, backed by a dict. @@ -50,7 +36,7 @@ class Directory(FreshBase): and the value referencing a File or Directory object. """ - def __init__(self, parent_inode, inodes): + def __init__(self, parent_inode, inodes, apiconfig, enable_write): """parent_inode is the integer inode number""" super(Directory, self).__init__() @@ -60,11 +46,54 @@ class Directory(FreshBase): raise Exception("parent_inode should be an int") self.parent_inode = parent_inode self.inodes = inodes + self.apiconfig = apiconfig self._entries = {} self._mtime = time.time() + self._enable_write = enable_write + + def forward_slash_subst(self): + if not hasattr(self, '_fsns'): + self._fsns = None + config = self.apiconfig() + try: + self._fsns = config["Collections"]["ForwardSlashNameSubstitution"] + except KeyError: + # old API server with no FSNS config + self._fsns = '_' + else: + if self._fsns == '' or self._fsns == '/': + self._fsns = None + return self._fsns + + def unsanitize_filename(self, incoming): + """Replace ForwardSlashNameSubstitution value with /""" + fsns = self.forward_slash_subst() + if isinstance(fsns, str): + return incoming.replace(fsns, '/') + else: + return incoming + + def sanitize_filename(self, dirty): + """Replace disallowed filename characters according to + ForwardSlashNameSubstitution in self.api_config.""" + # '.' and '..' are not reachable if API server is newer than #6277 + if dirty is None: + return None + elif dirty == '': + return '_' + elif dirty == '.': + return '_' + elif dirty == '..': + return '__' + else: + fsns = self.forward_slash_subst() + if isinstance(fsns, str): + dirty = dirty.replace('/', fsns) + return _disallowed_filename_characters.sub('_', dirty) + - # Overriden by subclasses to implement logic to update the entries dict - # when the directory is stale + # Overridden by subclasses to implement logic to update the + # entries dict when the directory is stale @use_counter def update(self): pass @@ -133,7 +162,7 @@ class Directory(FreshBase): self._entries = {} changed = False for i in items: - name = sanitize_filename(fn(i)) + name = self.sanitize_filename(fn(i)) if name: if name in oldentries and same(oldentries[name], i): # move existing directory entry over @@ -150,12 +179,12 @@ class Directory(FreshBase): # delete any other directory entries that were not in found in 'items' for i in oldentries: _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode) - self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding)) + self.inodes.invalidate_entry(self, i) self.inodes.del_entry(oldentries[i]) changed = True if changed: - self.inodes.invalidate_inode(self.inode) + self.inodes.invalidate_inode(self) self._mtime = time.time() self.fresh() @@ -163,7 +192,7 @@ class Directory(FreshBase): def in_use(self): if super(Directory, self).in_use(): return True - for v in self._entries.itervalues(): + for v in self._entries.values(): if v.in_use(): return True return False @@ -171,7 +200,7 @@ class Directory(FreshBase): def has_ref(self, only_children): if super(Directory, self).has_ref(only_children): return True - for v in self._entries.itervalues(): + for v in self._entries.values(): if v.has_ref(False): return True return False @@ -182,16 +211,21 @@ class Directory(FreshBase): self._entries = {} for n in oldentries: oldentries[n].clear() - self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) self.inodes.del_entry(oldentries[n]) - self.inodes.invalidate_inode(self.inode) self.invalidate() def kernel_invalidate(self): - for n, e in self._entries.iteritems(): - self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) - e.kernel_invalidate() - self.inodes.invalidate_inode(self.inode) + # Invalidating the dentry on the parent implies invalidating all paths + # below it as well. + parent = self.inodes[self.parent_inode] + + # Find self on the parent in order to invalidate this path. + # Calling the public items() method might trigger a refresh, + # which we definitely don't want, so read the internal dict directly. + for k,v in parent._entries.items(): + if v is self: + self.inodes.invalidate_entry(parent, k) + break def mtime(self): return self._mtime @@ -236,12 +270,15 @@ class CollectionDirectoryBase(Directory): """ - def __init__(self, parent_inode, inodes, collection): - super(CollectionDirectoryBase, self).__init__(parent_inode, inodes) + def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection, collection_root): + super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write) + self.apiconfig = apiconfig self.collection = collection + self.collection_root = collection_root + self.collection_record_file = None def new_entry(self, name, item, mtime): - name = sanitize_filename(name) + name = self.sanitize_filename(name) if hasattr(item, "fuse_entry") and item.fuse_entry is not None: if item.fuse_entry.dead is not True: raise Exception("Can only reparent dead inode entry") @@ -250,59 +287,105 @@ class CollectionDirectoryBase(Directory): item.fuse_entry.dead = False self._entries[name] = item.fuse_entry elif isinstance(item, arvados.collection.RichCollectionBase): - self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item)) + self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item, self.collection_root)) self._entries[name].populate(mtime) else: - self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime)) + self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write)) item.fuse_entry = self._entries[name] def on_event(self, event, collection, name, item): + # These are events from the Collection object (ADD/DEL/MOD) + # emitted by operations on the Collection object (like + # "mkdirs" or "remove"), and by "update", which we need to + # synchronize with our FUSE objects that are assigned inodes. if collection == self.collection: - name = sanitize_filename(name) - _logger.debug("collection notify %s %s %s %s", event, collection, name, item) - with llfuse.lock: - if event == arvados.collection.ADD: - self.new_entry(name, item, self.mtime()) - elif event == arvados.collection.DEL: - ent = self._entries[name] - del self._entries[name] - self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding)) - self.inodes.del_entry(ent) - elif event == arvados.collection.MOD: - if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - self.inodes.invalidate_inode(item.fuse_entry.inode) - elif name in self._entries: - self.inodes.invalidate_inode(self._entries[name].inode) + name = self.sanitize_filename(name) + + # + # It's possible for another thread to have llfuse.lock and + # be waiting on collection.lock. Meanwhile, we released + # llfuse.lock earlier in the stack, but are still holding + # on to the collection lock, and now we need to re-acquire + # llfuse.lock. If we don't release the collection lock, + # we'll deadlock where we're holding the collection lock + # waiting for llfuse.lock and the other thread is holding + # llfuse.lock and waiting for the collection lock. + # + # The correct locking order here is to take llfuse.lock + # first, then the collection lock. + # + # Since collection.lock is an RLock, it might be locked + # multiple times, so we need to release it multiple times, + # keep a count, then re-lock it the correct number of + # times. + # + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass + + try: + with llfuse.lock: + with self.collection.lock: + if event == arvados.collection.ADD: + self.new_entry(name, item, self.mtime()) + elif event == arvados.collection.DEL: + ent = self._entries[name] + del self._entries[name] + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) + elif event == arvados.collection.MOD: + if hasattr(item, "fuse_entry") and item.fuse_entry is not None: + self.inodes.invalidate_inode(item.fuse_entry) + elif name in self._entries: + self.inodes.invalidate_inode(self._entries[name]) + + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def populate(self, mtime): self._mtime = mtime - self.collection.subscribe(self.on_event) - for entry, item in self.collection.items(): - self.new_entry(entry, item, self.mtime()) + with self.collection.lock: + self.collection.subscribe(self.on_event) + for entry, item in self.collection.items(): + self.new_entry(entry, item, self.mtime()) def writable(self): - return self.collection.writable() + return self._enable_write and self.collection.writable() @use_counter def flush(self): - with llfuse.lock_released: - self.collection.root_collection().save() + self.collection_root.flush() @use_counter @check_update def create(self, name): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) with llfuse.lock_released: self.collection.open(name, "w").close() @use_counter @check_update def mkdir(self, name): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) with llfuse.lock_released: self.collection.mkdirs(name) @use_counter @check_update def unlink(self, name): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) with llfuse.lock_released: self.collection.remove(name) self.flush() @@ -310,6 +393,8 @@ class CollectionDirectoryBase(Directory): @use_counter @check_update def rmdir(self, name): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) with llfuse.lock_released: self.collection.remove(name) self.flush() @@ -317,6 +402,9 @@ class CollectionDirectoryBase(Directory): @use_counter @check_update def rename(self, name_old, name_new, src): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) + if not isinstance(src, CollectionDirectoryBase): raise llfuse.FUSEError(errno.EPERM) @@ -346,15 +434,13 @@ class CollectionDirectoryBase(Directory): class CollectionDirectory(CollectionDirectoryBase): """Represents the root of a directory tree representing a collection.""" - def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None): - super(CollectionDirectory, self).__init__(parent_inode, inodes, None) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None): + super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None, self) self.api = api self.num_retries = num_retries - self.collection_record_file = None - self.collection_record = None self._poll = True try: - self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2) + self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2) // 2) except: _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0]) self._poll_time = 60*60 @@ -367,70 +453,79 @@ class CollectionDirectory(CollectionDirectoryBase): self._mtime = 0 self._manifest_size = 0 if self.collection_locator: - self._writable = (uuid_pattern.match(self.collection_locator) is not None) + self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write self._updating_lock = threading.Lock() def same(self, i): return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator def writable(self): - return self.collection.writable() if self.collection is not None else self._writable + return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable) + + @use_counter + def flush(self): + if not self.writable(): + return + with llfuse.lock_released: + with self._updating_lock: + if self.collection.committed(): + self.collection.update() + else: + self.collection.save() + self.new_collection_record(self.collection.api_response()) def want_event_subscribe(self): return (uuid_pattern.match(self.collection_locator) is not None) - # Used by arv-web.py to switch the contents of the CollectionDirectory - def change_collection(self, new_locator): - """Switch the contents of the CollectionDirectory. - - Must be called with llfuse.lock held. - """ - - self.collection_locator = new_locator - self.collection_record = None - self.update() - def new_collection(self, new_collection_record, coll_reader): if self.inode: self.clear() - - self.collection_record = new_collection_record - - if self.collection_record: - self._mtime = convertTime(self.collection_record.get('modified_at')) - self.collection_locator = self.collection_record["uuid"] - if self.collection_record_file is not None: - self.collection_record_file.update(self.collection_record) - self.collection = coll_reader + self.new_collection_record(new_collection_record) self.populate(self.mtime()) + def new_collection_record(self, new_collection_record): + if not new_collection_record: + raise Exception("invalid new_collection_record") + self._mtime = convertTime(new_collection_record.get('modified_at')) + self._manifest_size = len(new_collection_record["manifest_text"]) + self.collection_locator = new_collection_record["uuid"] + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record file", self) + self.fresh() + def uuid(self): return self.collection_locator @use_counter - def update(self, to_record_version=None): + def update(self): try: - if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator): + if self.collection is not None and portable_data_hash_pattern.match(self.collection_locator): + # It's immutable, nothing to update return True if self.collection_locator is None: + # No collection locator to retrieve from self.fresh() return True + new_collection_record = None try: with llfuse.lock_released: self._updating_lock.acquire() if not self.stale(): - return + return True - _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) + _logger.debug("Updating collection %s inode %s", self.collection_locator, self.inode) + coll_reader = None if self.collection is not None: - if self.collection.known_past_version(to_record_version): - _logger.debug("%s already processed %s", self.collection_locator, to_record_version) - else: - self.collection.update() + # Already have a collection object + self.collection.update() + new_collection_record = self.collection.api_response() else: + # Create a new collection object if uuid_pattern.match(self.collection_locator): coll_reader = arvados.collection.Collection( self.collection_locator, self.api, self.api.keep, @@ -448,15 +543,17 @@ class CollectionDirectory(CollectionDirectoryBase): new_collection_record["portable_data_hash"] = new_collection_record["uuid"] if 'manifest_text' not in new_collection_record: new_collection_record['manifest_text'] = coll_reader.manifest_text() + if 'storage_classes_desired' not in new_collection_record: + new_collection_record['storage_classes_desired'] = coll_reader.storage_classes_desired() - if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"): - self.new_collection(new_collection_record, coll_reader) - - self._manifest_size = len(coll_reader.manifest_text()) - _logger.debug("%s manifest_size %i", self, self._manifest_size) # end with llfuse.lock_released, re-acquire lock - self.fresh() + if new_collection_record is not None: + if coll_reader is not None: + self.new_collection(new_collection_record, coll_reader) + else: + self.new_collection_record(new_collection_record) + return True finally: self._updating_lock.release() @@ -464,22 +561,29 @@ class CollectionDirectory(CollectionDirectoryBase): _logger.error("Error fetching collection '%s': %s", self.collection_locator, e) except arvados.errors.ArgumentError as detail: _logger.warning("arv-mount %s: error %s", self.collection_locator, detail) - if self.collection_record is not None and "manifest_text" in self.collection_record: - _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + if new_collection_record is not None and "manifest_text" in new_collection_record: + _logger.warning("arv-mount manifest_text is: %s", new_collection_record["manifest_text"]) except Exception: _logger.exception("arv-mount %s: error", self.collection_locator) - if self.collection_record is not None and "manifest_text" in self.collection_record: - _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + if new_collection_record is not None and "manifest_text" in new_collection_record: + _logger.error("arv-mount manifest_text is: %s", new_collection_record["manifest_text"]) self.invalidate() return False + @use_counter + def collection_record(self): + self.flush() + return self.collection.api_response() + @use_counter @check_update def __getitem__(self, item): if item == '.arvados#collection': if self.collection_record_file is None: - self.collection_record_file = ObjectFile(self.inode, self.collection_record) + self.collection_record_file = FuncToJSONFile( + self.inode, self.collection_record) self.inodes.add_entry(self.collection_record_file) + self.invalidate() # use lookup as a signal to force update return self.collection_record_file else: return super(CollectionDirectory, self).__getitem__(item) @@ -491,8 +595,9 @@ class CollectionDirectory(CollectionDirectoryBase): return super(CollectionDirectory, self).__contains__(k) def invalidate(self): - self.collection_record = None - self.collection_record_file = None + if self.collection_record_file is not None: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) super(CollectionDirectory, self).invalidate() def persisted(self): @@ -511,6 +616,8 @@ class CollectionDirectory(CollectionDirectoryBase): self.collection.stop_threads() def clear(self): + if self.collection is not None: + self.collection.stop_threads() super(CollectionDirectory, self).clear() self._manifest_size = 0 @@ -530,23 +637,42 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def save_new(self): pass - def __init__(self, parent_inode, inodes, api_client, num_retries): + def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None): collection = self.UnsaveableCollection( api_client=api_client, keep_client=api_client.keep, - num_retries=num_retries) + num_retries=num_retries, + storage_classes_desired=storage_classes) + # This is always enable_write=True because it never tries to + # save to the backend super(TmpCollectionDirectory, self).__init__( - parent_inode, inodes, collection) - self.collection_record_file = None + parent_inode, inodes, api_client.config, True, collection, self) self.populate(self.mtime()) def on_event(self, *args, **kwargs): super(TmpCollectionDirectory, self).on_event(*args, **kwargs) - if self.collection_record_file: + if self.collection_record_file is None: + return + + # See discussion in CollectionDirectoryBase.on_event + lockcount = 0 + try: + while True: + self.collection.lock.release() + lockcount += 1 + except RuntimeError: + pass + + try: with llfuse.lock: - self.collection_record_file.invalidate() - self.inodes.invalidate_inode(self.collection_record_file.inode) - _logger.debug("%s invalidated collection record", self) + with self.collection.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self) + finally: + while lockcount > 0: + self.collection.lock.acquire() + lockcount -= 1 def collection_record(self): with llfuse.lock_released: @@ -554,6 +680,7 @@ class TmpCollectionDirectory(CollectionDirectoryBase): "uuid": None, "manifest_text": self.collection.manifest_text(), "portable_data_hash": self.collection.portable_data_hash(), + "storage_classes_desired": self.collection.storage_classes_desired(), } def __contains__(self, k): @@ -576,6 +703,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def writable(self): return True + def flush(self): + pass + def want_event_subscribe(self): return False @@ -602,20 +732,22 @@ class MagicDirectory(Directory): README_TEXT = """ This directory provides access to Arvados collections as subdirectories listed by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in -the form '1234567890abcdef0123456789abcdef+123'). +the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid +(in the form 'zzzzz-j7d0g-1234567890abcde'). Note that this directory will appear empty until you attempt to access a -specific collection subdirectory (such as trying to 'cd' into it), at which -point the collection will actually be looked up on the server and the directory -will appear if it exists. +specific collection or project subdirectory (such as trying to 'cd' into it), +at which point the collection or project will actually be looked up on the server +and the directory will appear if it exists. """.lstrip() - def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False): - super(MagicDirectory, self).__init__(parent_inode, inodes) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None): + super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) self.api = api self.num_retries = num_retries self.pdh_only = pdh_only + self.storage_classes = storage_classes def __setattr__(self, name, value): super(MagicDirectory, self).__setattr__(name, value) @@ -627,7 +759,8 @@ will appear if it exists. # If we're the root directory, add an identical by_id subdirectory. if self.inode == llfuse.ROOT_INODE: self._entries['by_id'] = self.inodes.add_entry(MagicDirectory( - self.inode, self.inodes, self.api, self.num_retries, self.pdh_only)) + self.inode, self.inodes, self.api, self.num_retries, self._enable_write, + self.pdh_only)) def __contains__(self, k): if k in self._entries: @@ -637,8 +770,19 @@ will appear if it exists. return False try: - e = self.inodes.add_entry(CollectionDirectory( - self.inode, self.inodes, self.api, self.num_retries, k)) + e = None + + if group_uuid_pattern.match(k): + project = self.api.groups().list( + filters=[['group_class', 'in', ['project','filter']], ["uuid", "=", k]]).execute(num_retries=self.num_retries) + if project[u'items_available'] == 0: + return False + e = self.inodes.add_entry(ProjectDirectory( + self.inode, self.inodes, self.api, self.num_retries, self._enable_write, + project[u'items'][0], storage_classes=self.storage_classes)) + else: + e = self.inodes.add_entry(CollectionDirectory( + self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k)) if e.update(): if k not in self._entries: @@ -647,12 +791,13 @@ will appear if it exists. self.inodes.del_entry(e) return True else: - self.inodes.invalidate_entry(self.inode, k) + self.inodes.invalidate_entry(self, k) self.inodes.del_entry(e) return False except Exception as ex: - _logger.debug('arv-mount exception keep %s', ex) - self.inodes.del_entry(e) + _logger.exception("arv-mount lookup '%s':", k) + if e is not None: + self.inodes.del_entry(e) return False def __getitem__(self, item): @@ -671,12 +816,13 @@ will appear if it exists. class TagsDirectory(Directory): """A special directory that contains as subdirectories all tags visible to the user.""" - def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60): - super(TagsDirectory, self).__init__(parent_inode, inodes) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60): + super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) self.api = api self.num_retries = num_retries self._poll = True self._poll_time = poll_time + self._extra = set() def want_event_subscribe(self): return True @@ -685,14 +831,41 @@ class TagsDirectory(Directory): def update(self): with llfuse.lock_released: tags = self.api.links().list( - filters=[['link_class', '=', 'tag']], - select=['name'], distinct=True + filters=[['link_class', '=', 'tag'], ["name", "!=", ""]], + select=['name'], distinct=True, limit=1000 ).execute(num_retries=self.num_retries) if "items" in tags: - self.merge(tags['items'], + self.merge(tags['items']+[{"name": n} for n in self._extra], lambda i: i['name'], lambda a, i: a.tag == i['name'], - lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time)) + lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, + i['name'], poll=self._poll, poll_time=self._poll_time)) + + @use_counter + @check_update + def __getitem__(self, item): + if super(TagsDirectory, self).__contains__(item): + return super(TagsDirectory, self).__getitem__(item) + with llfuse.lock_released: + tags = self.api.links().list( + filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1 + ).execute(num_retries=self.num_retries) + if tags["items"]: + self._extra.add(item) + self.update() + return super(TagsDirectory, self).__getitem__(item) + + @use_counter + @check_update + def __contains__(self, k): + if super(TagsDirectory, self).__contains__(k): + return True + try: + self[k] + return True + except KeyError: + pass + return False class TagDirectory(Directory): @@ -700,9 +873,9 @@ class TagDirectory(Directory): to the user that are tagged with a particular tag. """ - def __init__(self, parent_inode, inodes, api, num_retries, tag, + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag, poll=False, poll_time=60): - super(TagDirectory, self).__init__(parent_inode, inodes) + super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) self.api = api self.num_retries = num_retries self.tag = tag @@ -724,15 +897,15 @@ class TagDirectory(Directory): self.merge(taggedcollections['items'], lambda i: i['head_uuid'], lambda a, i: a.collection_locator == i['head_uuid'], - lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])) + lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])) class ProjectDirectory(Directory): """A special directory that contains the contents of a project.""" - def __init__(self, parent_inode, inodes, api, num_retries, project_object, - poll=False, poll_time=60): - super(ProjectDirectory, self).__init__(parent_inode, inodes) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object, + poll=True, poll_time=3, storage_classes=None): + super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) self.api = api self.num_retries = num_retries self.project_object = project_object @@ -742,18 +915,21 @@ class ProjectDirectory(Directory): self._poll_time = poll_time self._updating_lock = threading.Lock() self._current_user = None + self._full_listing = False + self.storage_classes = storage_classes def want_event_subscribe(self): return True def createDirectory(self, i): if collection_uuid_pattern.match(i['uuid']): - return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i) + return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i) elif group_uuid_pattern.match(i['uuid']): - return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time) + return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, + i, self._poll, self._poll_time, self.storage_classes) elif link_uuid_pattern.match(i['uuid']): if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']): - return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']) + return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']) else: return None elif uuid_pattern.match(i['uuid']): @@ -764,27 +940,35 @@ class ProjectDirectory(Directory): def uuid(self): return self.project_uuid + def items(self): + self._full_listing = True + return super(ProjectDirectory, self).items() + + def namefn(self, i): + if 'name' in i: + if i['name'] is None or len(i['name']) == 0: + return None + elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])): + # collection or subproject + return i['name'] + elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection': + # name link + return i['name'] + elif 'kind' in i and i['kind'].startswith('arvados#'): + # something else + return "{}.{}".format(i['name'], i['kind'][8:]) + else: + return None + + @use_counter def update(self): if self.project_object_file == None: self.project_object_file = ObjectFile(self.inode, self.project_object) self.inodes.add_entry(self.project_object_file) - def namefn(i): - if 'name' in i: - if i['name'] is None or len(i['name']) == 0: - return None - elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']): - # collection or subproject - return i['name'] - elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection': - # name link - return i['name'] - elif 'kind' in i and i['kind'].startswith('arvados#'): - # something else - return "{}.{}".format(i['name'], i['kind'][8:]) - else: - return None + if not self._full_listing: + return True def samefn(a, i): if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory): @@ -805,40 +989,89 @@ class ProjectDirectory(Directory): elif user_uuid_pattern.match(self.project_uuid): self.project_object = self.api.users().get( uuid=self.project_uuid).execute(num_retries=self.num_retries) + # do this in 2 steps until #17424 is fixed + contents = list(arvados.util.keyset_list_all(self.api.groups().contents, + order_key="uuid", + num_retries=self.num_retries, + uuid=self.project_uuid, + filters=[["uuid", "is_a", "arvados#group"], + ["groups.group_class", "in", ["project","filter"]]])) + contents.extend(filter(lambda i: i["current_version_uuid"] == i["uuid"], + arvados.util.keyset_list_all(self.api.groups().contents, + order_key="uuid", + num_retries=self.num_retries, + uuid=self.project_uuid, + filters=[["uuid", "is_a", "arvados#collection"]]))) - contents = arvados.util.list_all(self.api.groups().contents, - self.num_retries, uuid=self.project_uuid) # end with llfuse.lock_released, re-acquire lock self.merge(contents, - namefn, + self.namefn, samefn, self.createDirectory) + return True finally: self._updating_lock.release() + def _add_entry(self, i, name): + ent = self.createDirectory(i) + self._entries[name] = self.inodes.add_entry(ent) + return self._entries[name] + @use_counter @check_update - def __getitem__(self, item): - if item == '.arvados#project': + def __getitem__(self, k): + if k == '.arvados#project': return self.project_object_file - else: - return super(ProjectDirectory, self).__getitem__(item) + elif self._full_listing or super(ProjectDirectory, self).__contains__(k): + return super(ProjectDirectory, self).__getitem__(k) + with llfuse.lock_released: + k2 = self.unsanitize_filename(k) + if k2 == k: + namefilter = ["name", "=", k] + else: + namefilter = ["name", "in", [k, k2]] + contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid], + ["group_class", "in", ["project","filter"]], + namefilter], + limit=2).execute(num_retries=self.num_retries)["items"] + if not contents: + contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid], + namefilter], + limit=2).execute(num_retries=self.num_retries)["items"] + if contents: + if len(contents) > 1 and contents[1]['name'] == k: + # If "foo/bar" and "foo[SUBST]bar" both exist, use + # "foo[SUBST]bar". + contents = [contents[1]] + name = self.sanitize_filename(self.namefn(contents[0])) + if name != k: + raise KeyError(k) + return self._add_entry(contents[0], name) + + # Didn't find item + raise KeyError(k) def __contains__(self, k): if k == '.arvados#project': return True - else: - return super(ProjectDirectory, self).__contains__(k) + try: + self[k] + return True + except KeyError: + pass + return False @use_counter @check_update def writable(self): + if not self._enable_write: + return False with llfuse.lock_released: if not self._current_user: self._current_user = self.api.users().current().execute(num_retries=self.num_retries) - return self._current_user["uuid"] in self.project_object["writable_by"] + return self._current_user["uuid"] in self.project_object.get("writable_by", []) def persisted(self): return True @@ -846,11 +1079,21 @@ class ProjectDirectory(Directory): @use_counter @check_update def mkdir(self, name): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) + try: with llfuse.lock_released: - self.api.collections().create(body={"owner_uuid": self.project_uuid, - "name": name, - "manifest_text": ""}).execute(num_retries=self.num_retries) + c = { + "owner_uuid": self.project_uuid, + "name": name, + "manifest_text": "" } + if self.storage_classes is not None: + c["storage_classes_desired"] = self.storage_classes + try: + self.api.collections().create(body=c).execute(num_retries=self.num_retries) + except Exception as e: + raise self.invalidate() except apiclient_errors.Error as error: _logger.error(error) @@ -859,6 +1102,9 @@ class ProjectDirectory(Directory): @use_counter @check_update def rmdir(self, name): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) + if name not in self: raise llfuse.FUSEError(errno.ENOENT) if not isinstance(self[name], CollectionDirectory): @@ -872,6 +1118,9 @@ class ProjectDirectory(Directory): @use_counter @check_update def rename(self, name_old, name_new, src): + if not self.writable(): + raise llfuse.FUSEError(errno.EROFS) + if not isinstance(src, ProjectDirectory): raise llfuse.FUSEError(errno.EPERM) @@ -894,78 +1143,155 @@ class ProjectDirectory(Directory): # Acually move the entry from source directory to this directory. del src._entries[name_old] self._entries[name_new] = ent - self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding)) + self.inodes.invalidate_entry(src, name_old) + + @use_counter + def child_event(self, ev): + properties = ev.get("properties") or {} + old_attrs = properties.get("old_attributes") or {} + new_attrs = properties.get("new_attributes") or {} + old_attrs["uuid"] = ev["object_uuid"] + new_attrs["uuid"] = ev["object_uuid"] + old_name = self.sanitize_filename(self.namefn(old_attrs)) + new_name = self.sanitize_filename(self.namefn(new_attrs)) + + # create events will have a new name, but not an old name + # delete events will have an old name, but not a new name + # update events will have an old and new name, and they may be same or different + # if they are the same, an unrelated field changed and there is nothing to do. + + if old_attrs.get("owner_uuid") != self.project_uuid: + # Was moved from somewhere else, so don't try to remove entry. + old_name = None + if ev.get("object_owner_uuid") != self.project_uuid: + # Was moved to somewhere else, so don't try to add entry + new_name = None + + if old_attrs.get("is_trashed"): + # Was previously deleted + old_name = None + if new_attrs.get("is_trashed"): + # Has been deleted + new_name = None + + if new_name != old_name: + ent = None + if old_name in self._entries: + ent = self._entries[old_name] + del self._entries[old_name] + self.inodes.invalidate_entry(self, old_name) + + if new_name: + if ent is not None: + self._entries[new_name] = ent + else: + self._add_entry(new_attrs, new_name) + elif ent is not None: + self.inodes.del_entry(ent) class SharedDirectory(Directory): """A special directory that represents users or groups who have shared projects with me.""" - def __init__(self, parent_inode, inodes, api, num_retries, exclude, - poll=False, poll_time=60): - super(SharedDirectory, self).__init__(parent_inode, inodes) + def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude, + poll=False, poll_time=60, storage_classes=None): + super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write) self.api = api self.num_retries = num_retries self.current_user = api.users().current().execute(num_retries=num_retries) self._poll = True self._poll_time = poll_time + self._updating_lock = threading.Lock() + self.storage_classes = storage_classes @use_counter def update(self): - with llfuse.lock_released: - all_projects = arvados.util.list_all( - self.api.groups().list, self.num_retries, - filters=[['group_class','=','project']]) - objects = {} - for ob in all_projects: - objects[ob['uuid']] = ob - - roots = [] - root_owners = {} - for ob in all_projects: - if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects: - roots.append(ob) - root_owners[ob['owner_uuid']] = True - - lusers = arvados.util.list_all( - self.api.users().list, self.num_retries, - filters=[['uuid','in', list(root_owners)]]) - lgroups = arvados.util.list_all( - self.api.groups().list, self.num_retries, - filters=[['uuid','in', list(root_owners)]]) - - users = {} - groups = {} - - for l in lusers: - objects[l["uuid"]] = l - for l in lgroups: - objects[l["uuid"]] = l - - contents = {} - for r in root_owners: - if r in objects: - obr = objects[r] - if obr.get("name"): - contents[obr["name"]] = obr - #elif obr.get("username"): - # contents[obr["username"]] = obr - elif "first_name" in obr: - contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr - - - for r in roots: - if r['owner_uuid'] not in objects: - contents[r['name']] = r - - # end with llfuse.lock_released, re-acquire lock - try: + with llfuse.lock_released: + self._updating_lock.acquire() + if not self.stale(): + return + + contents = {} + roots = [] + root_owners = set() + objects = {} + + methods = self.api._rootDesc.get('resources')["groups"]['methods'] + if 'httpMethod' in methods.get('shared', {}): + page = [] + while True: + resp = self.api.groups().shared(filters=[['group_class', 'in', ['project','filter']]]+page, + order="uuid", + limit=10000, + count="none", + include="owner_uuid").execute() + if not resp["items"]: + break + page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]] + for r in resp["items"]: + objects[r["uuid"]] = r + roots.append(r["uuid"]) + for r in resp["included"]: + objects[r["uuid"]] = r + root_owners.add(r["uuid"]) + else: + all_projects = list(arvados.util.keyset_list_all( + self.api.groups().list, + order_key="uuid", + num_retries=self.num_retries, + filters=[['group_class','in',['project','filter']]], + select=["uuid", "owner_uuid"])) + for ob in all_projects: + objects[ob['uuid']] = ob + + current_uuid = self.current_user['uuid'] + for ob in all_projects: + if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects: + roots.append(ob['uuid']) + root_owners.add(ob['owner_uuid']) + + lusers = arvados.util.keyset_list_all( + self.api.users().list, + order_key="uuid", + num_retries=self.num_retries, + filters=[['uuid','in', list(root_owners)]]) + lgroups = arvados.util.keyset_list_all( + self.api.groups().list, + order_key="uuid", + num_retries=self.num_retries, + filters=[['uuid','in', list(root_owners)+roots]]) + + for l in lusers: + objects[l["uuid"]] = l + for l in lgroups: + objects[l["uuid"]] = l + + for r in root_owners: + if r in objects: + obr = objects[r] + if obr.get("name"): + contents[obr["name"]] = obr + elif "first_name" in obr: + contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr + + for r in roots: + if r in objects: + obr = objects[r] + if obr['owner_uuid'] not in objects: + contents[obr["name"]] = obr + + # end with llfuse.lock_released, re-acquire lock + self.merge(contents.items(), lambda i: i[0], lambda a, i: a.uuid() == i[1]['uuid'], - lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time)) + lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, + i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes)) except Exception: - _logger.exception() + _logger.exception("arv-mount shared dir error") + finally: + self._updating_lock.release() def want_event_subscribe(self): return True