X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a592559241a69087a35361177d9aa81c8a2c3e79..f42c7e3d3344104206ca0b8669e2b07a6b30388e:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index b44950f25a..16b3bb2cdb 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -6,9 +6,11 @@ import arvados import apiclient import functools import threading +from apiclient import errors as apiclient_errors +import errno from fusefile import StringFile, ObjectFile, FuseArvadosFile -from fresh import FreshBase, convertTime, use_counter +from fresh import FreshBase, convertTime, use_counter, check_update import arvados.collection from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern @@ -43,9 +45,10 @@ class Directory(FreshBase): """ def __init__(self, parent_inode, inodes): + """parent_inode is the integer inode number""" + super(Directory, self).__init__() - """parent_inode is the integer inode number""" self.inode = None if not isinstance(parent_inode, int): raise Exception("parent_inode should be an int") @@ -76,20 +79,25 @@ class Directory(FreshBase): _logger.warn(e) @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() return self._entries[item] @use_counter + @check_update def items(self): - self.checkupdate() return list(self._entries.items()) @use_counter + @check_update def __contains__(self, k): - self.checkupdate() return k in self._entries + @use_counter + @check_update + def __len__(self): + return len(self._entries) + def fresh(self): self.inodes.touch(self) super(Directory, self).fresh() @@ -135,13 +143,13 @@ class Directory(FreshBase): # delete any other directory entries that were not in found in 'items' for i in oldentries: - _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode) - llfuse.invalidate_entry(self.inode, str(i)) + _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode) + self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding)) self.inodes.del_entry(oldentries[i]) changed = True if changed: - llfuse.invalidate_inode(self.inode) + self.inodes.invalidate_inode(self.inode) self._mtime = time.time() self.fresh() @@ -157,9 +165,9 @@ class Directory(FreshBase): self._entries = oldentries return False for n in oldentries: - llfuse.invalidate_entry(self.inode, str(n)) + self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) self.inodes.del_entry(oldentries[n]) - llfuse.invalidate_inode(self.inode) + self.inodes.invalidate_inode(self.inode) self.invalidate() return True else: @@ -174,22 +182,37 @@ class Directory(FreshBase): def flush(self): pass - def create(self): + def create(self, name): raise NotImplementedError() - def mkdir(self): + def mkdir(self, name): raise NotImplementedError() - def unlink(self): + def unlink(self, name): raise NotImplementedError() - def rmdir(self): + def rmdir(self, name): raise NotImplementedError() - def rename(self): + def rename(self, name_old, name_new, src): raise NotImplementedError() + class CollectionDirectoryBase(Directory): + """Represent an Arvados Collection as a directory. + + This class is used for Subcollections, and is also the base class for + CollectionDirectory, which implements collection loading/saving on + Collection records. + + Most operations act only the underlying Arvados `Collection` object. The + `Collection` object signals via a notify callback to + `CollectionDirectoryBase.on_event` that an item was added, removed or + modified. FUSE inodes and directory entries are created, deleted or + invalidated in response to these events. + + """ + def __init__(self, parent_inode, inodes, collection): super(CollectionDirectoryBase, self).__init__(parent_inode, inodes) self.collection = collection @@ -212,20 +235,21 @@ class CollectionDirectoryBase(Directory): def on_event(self, event, collection, name, item): if collection == self.collection: - _logger.debug("%s %s %s %s", event, collection, name, item) + name = sanitize_filename(name) + _logger.debug("collection notify %s %s %s %s", event, collection, name, item) with llfuse.lock: if event == arvados.collection.ADD: self.new_entry(name, item, self.mtime()) elif event == arvados.collection.DEL: ent = self._entries[name] del self._entries[name] - llfuse.invalidate_entry(self.inode, name) + self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding)) self.inodes.del_entry(ent) elif event == arvados.collection.MOD: if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - llfuse.invalidate_inode(item.fuse_entry.inode) + self.inodes.invalidate_inode(item.fuse_entry.inode) elif name in self._entries: - llfuse.invalidate_inode(self._entries[name].inode) + self.inodes.invalidate_inode(self._entries[name].inode) def populate(self, mtime): self._mtime = mtime @@ -236,31 +260,64 @@ class CollectionDirectoryBase(Directory): def writable(self): return self.collection.writable() + @use_counter def flush(self): - self.collection.root_collection().save() + with llfuse.lock_released: + self.collection.root_collection().save() + @use_counter + @check_update def create(self, name): - self.collection.open(name, "w").close() + with llfuse.lock_released: + self.collection.open(name, "w").close() + @use_counter + @check_update def mkdir(self, name): - self.collection.mkdirs(name) + with llfuse.lock_released: + self.collection.mkdirs(name) + @use_counter + @check_update def unlink(self, name): - self.collection.remove(name) + with llfuse.lock_released: + self.collection.remove(name) + self.flush() + @use_counter + @check_update def rmdir(self, name): - self.collection.remove(name) + with llfuse.lock_released: + self.collection.remove(name) + self.flush() + @use_counter + @check_update def rename(self, name_old, name_new, src): if not isinstance(src, CollectionDirectoryBase): raise llfuse.FUSEError(errno.EPERM) - self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True) + + if name_new in self: + ent = src[name_old] + tgt = self[name_new] + if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile): + pass + elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase): + if len(tgt) > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile): + raise llfuse.FUSEError(errno.ENOTDIR) + elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase): + raise llfuse.FUSEError(errno.EISDIR) + + with llfuse.lock_released: + self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True) self.flush() src.flush() class CollectionDirectory(CollectionDirectoryBase): - """Represents the root of a directory tree holding a collection.""" + """Represents the root of a directory tree representing a collection.""" def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None): super(CollectionDirectory, self).__init__(parent_inode, inodes, None) @@ -314,7 +371,8 @@ class CollectionDirectory(CollectionDirectoryBase): def uuid(self): return self.collection_locator - def update(self): + @use_counter + def update(self, to_record_version=None): try: if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator): return True @@ -329,9 +387,12 @@ class CollectionDirectory(CollectionDirectoryBase): if not self.stale(): return - _logger.debug("Updating %s", self.collection_locator) - if self.collection: - self.collection.update() + _logger.debug("Updating %s", to_record_version) + if self.collection is not None: + if self.collection.known_past_version(to_record_version): + _logger.debug("%s already processed %s", self.collection_locator, to_record_version) + else: + self.collection.update() else: if uuid_pattern.match(self.collection_locator): coll_reader = arvados.collection.Collection( @@ -374,8 +435,9 @@ class CollectionDirectory(CollectionDirectoryBase): _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) return False + @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() if item == '.arvados#collection': if self.collection_record_file is None: self.collection_record_file = ObjectFile(self.inode, self.collection_record) @@ -404,6 +466,13 @@ class CollectionDirectory(CollectionDirectoryBase): # footprint directly would be more accurate, but also more complicated. return self._manifest_size * 128 + def finalize(self): + if self.collection is not None: + if self.writable(): + self.collection.save() + self.collection.stop_threads() + + class MagicDirectory(Directory): """A special directory that logically contains the set of all extant keep locators. @@ -493,6 +562,7 @@ class TagsDirectory(RecursiveInvalidateDirectory): self._poll = True self._poll_time = poll_time + @use_counter def update(self): with llfuse.lock_released: tags = self.api.links().list( @@ -520,6 +590,7 @@ class TagDirectory(Directory): self._poll = poll self._poll_time = poll_time + @use_counter def update(self): with llfuse.lock_released: taggedcollections = self.api.links().list( @@ -548,6 +619,7 @@ class ProjectDirectory(Directory): self._poll = poll self._poll_time = poll_time self._updating_lock = threading.Lock() + self._current_user = None def createDirectory(self, i): if collection_uuid_pattern.match(i['uuid']): @@ -567,6 +639,7 @@ class ProjectDirectory(Directory): def uuid(self): return self.project_uuid + @use_counter def update(self): if self.project_object_file == None: self.project_object_file = ObjectFile(self.inode, self.project_object) @@ -620,8 +693,9 @@ class ProjectDirectory(Directory): finally: self._updating_lock.release() + @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() if item == '.arvados#project': return self.project_object_file else: @@ -633,9 +707,70 @@ class ProjectDirectory(Directory): else: return super(ProjectDirectory, self).__contains__(k) + @use_counter + @check_update + def writable(self): + with llfuse.lock_released: + if not self._current_user: + self._current_user = self.api.users().current().execute(num_retries=self.num_retries) + return self._current_user["uuid"] in self.project_object["writable_by"] + def persisted(self): return True + @use_counter + @check_update + def mkdir(self, name): + try: + with llfuse.lock_released: + self.api.collections().create(body={"owner_uuid": self.project_uuid, + "name": name, + "manifest_text": ""}).execute(num_retries=self.num_retries) + self.invalidate() + except apiclient_errors.Error as error: + _logger.error(error) + raise llfuse.FUSEError(errno.EEXIST) + + @use_counter + @check_update + def rmdir(self, name): + if name not in self: + raise llfuse.FUSEError(errno.ENOENT) + if not isinstance(self[name], CollectionDirectory): + raise llfuse.FUSEError(errno.EPERM) + if len(self[name]) > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + with llfuse.lock_released: + self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries) + self.invalidate() + + @use_counter + @check_update + def rename(self, name_old, name_new, src): + if not isinstance(src, ProjectDirectory): + raise llfuse.FUSEError(errno.EPERM) + + ent = src[name_old] + + if not isinstance(ent, CollectionDirectory): + raise llfuse.FUSEError(errno.EPERM) + + if name_new in self: + # POSIX semantics for replacing one directory with another is + # tricky (the target directory must be empty, the operation must be + # atomic which isn't possible with the Arvados API as of this + # writing) so don't support that. + raise llfuse.FUSEError(errno.EPERM) + + self.api.collections().update(uuid=ent.uuid(), + body={"owner_uuid": self.uuid(), + "name": name_new}).execute(num_retries=self.num_retries) + + # Acually move the entry from source directory to this directory. + del src._entries[name_old] + self._entries[name_new] = ent + self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding)) + class SharedDirectory(Directory): """A special directory that represents users or groups who have shared projects with me.""" @@ -649,6 +784,7 @@ class SharedDirectory(Directory): self._poll = True self._poll_time = poll_time + @use_counter def update(self): with llfuse.lock_released: all_projects = arvados.util.list_all(