X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d2f68bd1e108c3f2dda2322c427050d019b17e04..a04932c206425f4eb2f46e36f0acf4b7b194b865:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index 898af31d26..9b93187e4f 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -5,6 +5,9 @@ import llfuse import arvados import apiclient import functools +import threading +from apiclient import errors as apiclient_errors +import errno from fusefile import StringFile, ObjectFile, FuseArvadosFile from fresh import FreshBase, convertTime, use_counter @@ -72,7 +75,7 @@ class Directory(FreshBase): try: self.update() except apiclient.errors.HttpError as e: - _logger.debug(e) + _logger.warn(e) @use_counter def __getitem__(self, item): @@ -89,6 +92,11 @@ class Directory(FreshBase): self.checkupdate() return k in self._entries + @use_counter + def __len__(self): + self.checkupdate() + return len(self._entries) + def fresh(self): self.inodes.touch(self) super(Directory, self).fresh() @@ -125,6 +133,7 @@ class Directory(FreshBase): self._entries[name] = oldentries[name] del oldentries[name] else: + _logger.debug("Adding entry '%s' to inode %i", name, self.inode) # create new directory entry ent = new_entry(i) if ent is not None: @@ -133,6 +142,7 @@ class Directory(FreshBase): # delete any other directory entries that were not in found in 'items' for i in oldentries: + _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode) llfuse.invalidate_entry(self.inode, str(i)) self.inodes.del_entry(oldentries[i]) changed = True @@ -171,6 +181,21 @@ class Directory(FreshBase): def flush(self): pass + def create(self, name): + raise NotImplementedError() + + def mkdir(self, name): + raise NotImplementedError() + + def unlink(self, name): + raise NotImplementedError() + + def rmdir(self, name): + raise NotImplementedError() + + def rename(self, name_old, name_new, src): + raise NotImplementedError() + class CollectionDirectoryBase(Directory): def __init__(self, parent_inode, inodes, collection): super(CollectionDirectoryBase, self).__init__(parent_inode, inodes) @@ -178,15 +203,23 @@ class CollectionDirectoryBase(Directory): def new_entry(self, name, item, mtime): name = sanitize_filename(name) - if isinstance(item, arvados.collection.RichCollectionBase): + if hasattr(item, "fuse_entry") and item.fuse_entry is not None: + if item.fuse_entry.dead is not True: + raise Exception("Can only reparent dead inode entry") + if item.fuse_entry.inode is None: + raise Exception("Reparented entry must still have valid inode") + item.fuse_entry.dead = False + self._entries[name] = item.fuse_entry + elif isinstance(item, arvados.collection.RichCollectionBase): self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item)) self._entries[name].populate(mtime) else: self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime)) + item.fuse_entry = self._entries[name] def on_event(self, event, collection, name, item): - _logger.warn("Got event! %s %s %s %s", event, collection, name, item) if collection == self.collection: + _logger.debug("%s %s %s %s", event, collection, name, item) with llfuse.lock: if event == arvados.collection.ADD: self.new_entry(name, item, self.mtime()) @@ -196,9 +229,10 @@ class CollectionDirectoryBase(Directory): llfuse.invalidate_entry(self.inode, name) self.inodes.del_entry(ent) elif event == arvados.collection.MOD: - ent = self._entries[name] - llfuse.invalidate_inode(ent.inode) - _logger.warn("Finished handling event") + if hasattr(item, "fuse_entry") and item.fuse_entry is not None: + llfuse.invalidate_inode(item.fuse_entry.inode) + elif name in self._entries: + llfuse.invalidate_inode(self._entries[name].inode) def populate(self, mtime): self._mtime = mtime @@ -210,7 +244,47 @@ class CollectionDirectoryBase(Directory): return self.collection.writable() def flush(self): - self.collection.root_collection().save() + with llfuse.lock_released: + self.collection.root_collection().save() + + def create(self, name): + with llfuse.lock_released: + self.collection.open(name, "w").close() + + def mkdir(self, name): + with llfuse.lock_released: + self.collection.mkdirs(name) + + def unlink(self, name): + with llfuse.lock_released: + self.collection.remove(name) + + def rmdir(self, name): + with llfuse.lock_released: + self.collection.remove(name) + + def rename(self, name_old, name_new, src): + if not isinstance(src, CollectionDirectoryBase): + raise llfuse.FUSEError(errno.EPERM) + + if name_new in self: + ent = src[name_old] + tgt = self[name_new] + if isinstance(FuseArvadosFile, ent) and isinstance(FuseArvadosFile, tgt): + pass + elif isinstance(CollectionDirectoryBase, ent) and isinstance(CollectionDirectoryBase, tgt): + if len(tgt) > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + elif isinstance(CollectionDirectoryBase, ent) and isinstance(FuseArvadosFile, tgt): + raise llfuse.FUSEError(errno.ENOTDIR) + elif isinstance(FuseArvadosFile, ent) and isinstance(CollectionDirectoryBase, tgt): + raise llfuse.FUSEError(errno.EISDIR) + + with llfuse.lock_released: + self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True) + self.flush() + src.flush() + class CollectionDirectory(CollectionDirectoryBase): """Represents the root of a directory tree holding a collection.""" @@ -230,6 +304,7 @@ class CollectionDirectory(CollectionDirectoryBase): self._manifest_size = 0 if self.collection_locator: self._writable = (uuid_pattern.match(self.collection_locator) is not None) + self._updating_lock = threading.Lock() def same(self, i): return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator @@ -275,38 +350,45 @@ class CollectionDirectory(CollectionDirectoryBase): self.fresh() return True - with llfuse.lock_released: - _logger.debug("Updating %s", self.collection_locator) - if self.collection: - self.collection.update() - else: - if uuid_pattern.match(self.collection_locator): - coll_reader = arvados.collection.Collection( - self.collection_locator, self.api, self.api.keep, - num_retries=self.num_retries) + try: + with llfuse.lock_released: + self._updating_lock.acquire() + if not self.stale(): + return + + _logger.debug("Updating %s", self.collection_locator) + if self.collection: + self.collection.update() else: - coll_reader = arvados.collection.CollectionReader( - self.collection_locator, self.api, self.api.keep, - num_retries=self.num_retries) - new_collection_record = coll_reader.api_response() or {} - # If the Collection only exists in Keep, there will be no API - # response. Fill in the fields we need. - if 'uuid' not in new_collection_record: - new_collection_record['uuid'] = self.collection_locator - if "portable_data_hash" not in new_collection_record: - new_collection_record["portable_data_hash"] = new_collection_record["uuid"] - if 'manifest_text' not in new_collection_record: - new_collection_record['manifest_text'] = coll_reader.manifest_text() - - if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"): - self.new_collection(new_collection_record, coll_reader) - - self._manifest_size = len(coll_reader.manifest_text()) - _logger.debug("%s manifest_size %i", self, self._manifest_size) - # end with llfuse.lock_released, re-acquire lock + if uuid_pattern.match(self.collection_locator): + coll_reader = arvados.collection.Collection( + self.collection_locator, self.api, self.api.keep, + num_retries=self.num_retries) + else: + coll_reader = arvados.collection.CollectionReader( + self.collection_locator, self.api, self.api.keep, + num_retries=self.num_retries) + new_collection_record = coll_reader.api_response() or {} + # If the Collection only exists in Keep, there will be no API + # response. Fill in the fields we need. + if 'uuid' not in new_collection_record: + new_collection_record['uuid'] = self.collection_locator + if "portable_data_hash" not in new_collection_record: + new_collection_record["portable_data_hash"] = new_collection_record["uuid"] + if 'manifest_text' not in new_collection_record: + new_collection_record['manifest_text'] = coll_reader.manifest_text() + + if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"): + self.new_collection(new_collection_record, coll_reader) + + self._manifest_size = len(coll_reader.manifest_text()) + _logger.debug("%s manifest_size %i", self, self._manifest_size) + # end with llfuse.lock_released, re-acquire lock - self.fresh() - return True + self.fresh() + return True + finally: + self._updating_lock.release() except arvados.errors.NotFoundError: _logger.exception("arv-mount %s: error", self.collection_locator) except arvados.errors.ArgumentError as detail: @@ -344,6 +426,9 @@ class CollectionDirectory(CollectionDirectoryBase): return (self.collection_locator is not None) def objsize(self): + # This is an empirically-derived heuristic to estimate the memory used + # to store this collection's metadata. Calculating the memory + # footprint directly would be more accurate, but also more complicated. return self._manifest_size * 128 class MagicDirectory(Directory): @@ -486,9 +571,11 @@ class ProjectDirectory(Directory): self.num_retries = num_retries self.project_object = project_object self.project_object_file = None - self.uuid = project_object['uuid'] + self.project_uuid = project_object['uuid'] self._poll = poll self._poll_time = poll_time + self._updating_lock = threading.Lock() + self._current_user = None def createDirectory(self, i): if collection_uuid_pattern.match(i['uuid']): @@ -506,7 +593,7 @@ class ProjectDirectory(Directory): return None def uuid(self): - return self.uuid + return self.project_uuid def update(self): if self.project_object_file == None: @@ -530,31 +617,36 @@ class ProjectDirectory(Directory): return None def samefn(a, i): - if isinstance(a, CollectionDirectory): - return a.collection_locator == i['uuid'] - elif isinstance(a, ProjectDirectory): - return a.uuid == i['uuid'] + if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory): + return a.uuid() == i['uuid'] elif isinstance(a, ObjectFile): - return a.uuid == i['uuid'] and not a.stale() + return a.uuid() == i['uuid'] and not a.stale() return False - with llfuse.lock_released: - if group_uuid_pattern.match(self.uuid): - self.project_object = self.api.groups().get( - uuid=self.uuid).execute(num_retries=self.num_retries) - elif user_uuid_pattern.match(self.uuid): - self.project_object = self.api.users().get( - uuid=self.uuid).execute(num_retries=self.num_retries) + try: + with llfuse.lock_released: + self._updating_lock.acquire() + if not self.stale(): + return - contents = arvados.util.list_all(self.api.groups().contents, - self.num_retries, uuid=self.uuid) + if group_uuid_pattern.match(self.project_uuid): + self.project_object = self.api.groups().get( + uuid=self.project_uuid).execute(num_retries=self.num_retries) + elif user_uuid_pattern.match(self.project_uuid): + self.project_object = self.api.users().get( + uuid=self.project_uuid).execute(num_retries=self.num_retries) - # end with llfuse.lock_released, re-acquire lock + contents = arvados.util.list_all(self.api.groups().contents, + self.num_retries, uuid=self.project_uuid) + + # end with llfuse.lock_released, re-acquire lock - self.merge(contents, - namefn, - samefn, - self.createDirectory) + self.merge(contents, + namefn, + samefn, + self.createDirectory) + finally: + self._updating_lock.release() def __getitem__(self, item): self.checkupdate() @@ -569,9 +661,61 @@ class ProjectDirectory(Directory): else: return super(ProjectDirectory, self).__contains__(k) + def writable(self): + with llfuse.lock_released: + if not self._current_user: + self._current_user = self.api.users().current().execute(num_retries=self.num_retries) + return self._current_user["uuid"] in self.project_object["writable_by"] + def persisted(self): - return False + return True + + def mkdir(self, name): + try: + with llfuse.lock_released: + self.api.collections().create(body={"owner_uuid": self.project_uuid, + "name": name, + "manifest_text": ""}).execute(num_retries=self.num_retries) + self.invalidate() + except apiclient_errors.Error as error: + _logger.error(error) + raise llfuse.FUSEError(errno.EEXIST) + + def rmdir(self, name): + if name not in self: + raise llfuse.FUSEError(errno.ENOENT) + if not isinstance(self[name], CollectionDirectory): + raise llfuse.FUSEError(errno.EPERM) + if len(self[name]) > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + with llfuse.lock_released: + self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries) + self.invalidate() + def rename(self, name_old, name_new, src): + if not isinstance(src, ProjectDirectory): + raise llfuse.FUSEError(errno.EPERM) + + ent = src[name_old] + + if not isinstance(ent, CollectionDirectory): + raise llfuse.FUSEError(errno.EPERM) + + if name_new in self: + # POSIX semantics for replacing one directory with another is + # tricky (the target directory must be empty, the operation must be + # atomic which isn't possible with the Arvados API as of this + # writing) so don't support that. + raise llfuse.FUSEError(errno.EPERM) + + self.api.collections().update(uuid=ent.uuid(), + body={"owner_uuid": self.uuid(), + "name": name_new}).execute(num_retries=self.num_retries) + + # Acually move the entry from source directory to this directory. + del src._entries[name_old] + self._entries[name_new] = ent + llfuse.invalidate_entry(src.inode, name_old) class SharedDirectory(Directory): """A special directory that represents users or groups who have shared projects with me.""" @@ -620,11 +764,14 @@ class SharedDirectory(Directory): for r in root_owners: if r in objects: obr = objects[r] - if "name" in obr: + if obr.get("name"): contents[obr["name"]] = obr - if "first_name" in obr: + #elif obr.get("username"): + # contents[obr["username"]] = obr + elif "first_name" in obr: contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr + for r in roots: if r['owner_uuid'] not in objects: contents[r['name']] = r @@ -634,7 +781,7 @@ class SharedDirectory(Directory): try: self.merge(contents.items(), lambda i: i[0], - lambda a, i: a.uuid == i[1]['uuid'], + lambda a, i: a.uuid() == i[1]['uuid'], lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time)) except Exception: _logger.exception()