X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/32f8850d66e388fe0c086f8e1e4c74658c34fad0..d9f8f46ccd5a418dcf7b5f43aeb59cd2d9d424ba:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index d90bd8b398..a2135b3def 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -8,9 +8,10 @@ import functools import threading from apiclient import errors as apiclient_errors import errno +import time -from fusefile import StringFile, ObjectFile, FuseArvadosFile -from fresh import FreshBase, convertTime, use_counter +from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile +from fresh import FreshBase, convertTime, use_counter, check_update import arvados.collection from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern @@ -23,6 +24,7 @@ _logger = logging.getLogger('arvados.arvados_fuse') # appear as underscores in the fuse mount.) _disallowed_filename_characters = re.compile('[\x00/]') +# '.' and '..' are not reachable if API server is newer than #6277 def sanitize_filename(dirty): """Replace disallowed filename characters with harmless "_".""" if dirty is None: @@ -45,9 +47,10 @@ class Directory(FreshBase): """ def __init__(self, parent_inode, inodes): + """parent_inode is the integer inode number""" + super(Directory, self).__init__() - """parent_inode is the integer inode number""" self.inode = None if not isinstance(parent_inode, int): raise Exception("parent_inode should be an int") @@ -78,23 +81,23 @@ class Directory(FreshBase): _logger.warn(e) @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() return self._entries[item] @use_counter + @check_update def items(self): - self.checkupdate() return list(self._entries.items()) @use_counter + @check_update def __contains__(self, k): - self.checkupdate() return k in self._entries @use_counter + @check_update def __len__(self): - self.checkupdate() return len(self._entries) def fresh(self): @@ -142,13 +145,13 @@ class Directory(FreshBase): # delete any other directory entries that were not in found in 'items' for i in oldentries: - _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode) - llfuse.invalidate_entry(self.inode, str(i)) + _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode) + self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding)) self.inodes.del_entry(oldentries[i]) changed = True if changed: - llfuse.invalidate_inode(self.inode) + self.inodes.invalidate_inode(self.inode) self._mtime = time.time() self.fresh() @@ -164,9 +167,9 @@ class Directory(FreshBase): self._entries = oldentries return False for n in oldentries: - llfuse.invalidate_entry(self.inode, str(n)) + self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) self.inodes.del_entry(oldentries[n]) - llfuse.invalidate_inode(self.inode) + self.inodes.invalidate_inode(self.inode) self.invalidate() return True else: @@ -196,7 +199,22 @@ class Directory(FreshBase): def rename(self, name_old, name_new, src): raise NotImplementedError() + class CollectionDirectoryBase(Directory): + """Represent an Arvados Collection as a directory. + + This class is used for Subcollections, and is also the base class for + CollectionDirectory, which implements collection loading/saving on + Collection records. + + Most operations act only the underlying Arvados `Collection` object. The + `Collection` object signals via a notify callback to + `CollectionDirectoryBase.on_event` that an item was added, removed or + modified. FUSE inodes and directory entries are created, deleted or + invalidated in response to these events. + + """ + def __init__(self, parent_inode, inodes, collection): super(CollectionDirectoryBase, self).__init__(parent_inode, inodes) self.collection = collection @@ -219,20 +237,21 @@ class CollectionDirectoryBase(Directory): def on_event(self, event, collection, name, item): if collection == self.collection: - _logger.debug("%s %s %s %s", event, collection, name, item) + name = sanitize_filename(name) + _logger.debug("collection notify %s %s %s %s", event, collection, name, item) with llfuse.lock: if event == arvados.collection.ADD: self.new_entry(name, item, self.mtime()) elif event == arvados.collection.DEL: ent = self._entries[name] del self._entries[name] - llfuse.invalidate_entry(self.inode, name) + self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding)) self.inodes.del_entry(ent) elif event == arvados.collection.MOD: if hasattr(item, "fuse_entry") and item.fuse_entry is not None: - llfuse.invalidate_inode(item.fuse_entry.inode) + self.inodes.invalidate_inode(item.fuse_entry.inode) elif name in self._entries: - llfuse.invalidate_inode(self._entries[name].inode) + self.inodes.invalidate_inode(self._entries[name].inode) def populate(self, mtime): self._mtime = mtime @@ -243,37 +262,64 @@ class CollectionDirectoryBase(Directory): def writable(self): return self.collection.writable() + @use_counter def flush(self): with llfuse.lock_released: self.collection.root_collection().save() + @use_counter + @check_update def create(self, name): with llfuse.lock_released: self.collection.open(name, "w").close() + @use_counter + @check_update def mkdir(self, name): with llfuse.lock_released: self.collection.mkdirs(name) + @use_counter + @check_update def unlink(self, name): with llfuse.lock_released: self.collection.remove(name) + self.flush() + @use_counter + @check_update def rmdir(self, name): with llfuse.lock_released: self.collection.remove(name) + self.flush() + @use_counter + @check_update def rename(self, name_old, name_new, src): + if not isinstance(src, CollectionDirectoryBase): + raise llfuse.FUSEError(errno.EPERM) + + if name_new in self: + ent = src[name_old] + tgt = self[name_new] + if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile): + pass + elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase): + if len(tgt) > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile): + raise llfuse.FUSEError(errno.ENOTDIR) + elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase): + raise llfuse.FUSEError(errno.EISDIR) + with llfuse.lock_released: - if not isinstance(src, CollectionDirectoryBase): - raise llfuse.FUSEError(errno.EPERM) self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True) - self.flush() - src.flush() + self.flush() + src.flush() class CollectionDirectory(CollectionDirectoryBase): - """Represents the root of a directory tree holding a collection.""" + """Represents the root of a directory tree representing a collection.""" def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None): super(CollectionDirectory, self).__init__(parent_inode, inodes, None) @@ -327,7 +373,8 @@ class CollectionDirectory(CollectionDirectoryBase): def uuid(self): return self.collection_locator - def update(self): + @use_counter + def update(self, to_record_version=None): try: if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator): return True @@ -342,9 +389,12 @@ class CollectionDirectory(CollectionDirectoryBase): if not self.stale(): return - _logger.debug("Updating %s", self.collection_locator) - if self.collection: - self.collection.update() + _logger.debug("Updating %s", to_record_version) + if self.collection is not None: + if self.collection.known_past_version(to_record_version): + _logger.debug("%s already processed %s", self.collection_locator, to_record_version) + else: + self.collection.update() else: if uuid_pattern.match(self.collection_locator): coll_reader = arvados.collection.Collection( @@ -375,8 +425,8 @@ class CollectionDirectory(CollectionDirectoryBase): return True finally: self._updating_lock.release() - except arvados.errors.NotFoundError: - _logger.exception("arv-mount %s: error", self.collection_locator) + except arvados.errors.NotFoundError as e: + _logger.error("Error fetching collection '%s': %s", self.collection_locator, e) except arvados.errors.ArgumentError as detail: _logger.warning("arv-mount %s: error %s", self.collection_locator, detail) if self.collection_record is not None and "manifest_text" in self.collection_record: @@ -387,8 +437,9 @@ class CollectionDirectory(CollectionDirectoryBase): _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) return False + @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() if item == '.arvados#collection': if self.collection_record_file is None: self.collection_record_file = ObjectFile(self.inode, self.collection_record) @@ -417,6 +468,82 @@ class CollectionDirectory(CollectionDirectoryBase): # footprint directly would be more accurate, but also more complicated. return self._manifest_size * 128 + def finalize(self): + if self.collection is not None: + if self.writable(): + self.collection.save() + self.collection.stop_threads() + + +class TmpCollectionDirectory(CollectionDirectoryBase): + """A directory backed by an Arvados collection that never gets saved. + + This supports using Keep as scratch space. A userspace program can + read the .arvados#collection file to get a current manifest in + order to save a snapshot of the scratch data or use it as a crunch + job output. + """ + + class UnsaveableCollection(arvados.collection.Collection): + def save(self): + pass + def save_new(self): + pass + + def __init__(self, parent_inode, inodes, api_client, num_retries): + collection = self.UnsaveableCollection( + api_client=api_client, + keep_client=api_client.keep) + super(TmpCollectionDirectory, self).__init__( + parent_inode, inodes, collection) + self.collection_record_file = None + self.populate(self.mtime()) + + def on_event(self, *args, **kwargs): + super(TmpCollectionDirectory, self).on_event(*args, **kwargs) + if self.collection_record_file: + with llfuse.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file.inode) + _logger.debug("%s invalidated collection record", self) + + def collection_record(self): + with llfuse.lock_released: + return { + "uuid": None, + "manifest_text": self.collection.manifest_text(), + "portable_data_hash": self.collection.portable_data_hash(), + } + + def __contains__(self, k): + return (k == '.arvados#collection' or + super(TmpCollectionDirectory, self).__contains__(k)) + + @use_counter + def __getitem__(self, item): + if item == '.arvados#collection': + if self.collection_record_file is None: + self.collection_record_file = FuncToJSONFile( + self.inode, self.collection_record) + self.inodes.add_entry(self.collection_record_file) + return self.collection_record_file + return super(TmpCollectionDirectory, self).__getitem__(item) + + def persisted(self): + return False + + def writable(self): + return True + + def finalize(self): + self.collection.stop_threads() + + def invalidate(self): + if self.collection_record_file: + self.collection_record_file.invalidate() + super(TmpCollectionDirectory, self).invalidate() + + class MagicDirectory(Directory): """A special directory that logically contains the set of all extant keep locators. @@ -431,18 +558,20 @@ class MagicDirectory(Directory): README_TEXT = """ This directory provides access to Arvados collections as subdirectories listed by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in -the form '1234567890abcdefghijklmnopqrstuv+123'). +the form '1234567890abcdef0123456789abcdef+123'). Note that this directory will appear empty until you attempt to access a specific collection subdirectory (such as trying to 'cd' into it), at which point the collection will actually be looked up on the server and the directory will appear if it exists. + """.lstrip() - def __init__(self, parent_inode, inodes, api, num_retries): + def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False): super(MagicDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries + self.pdh_only = pdh_only def __setattr__(self, name, value): super(MagicDirectory, self).__setattr__(name, value) @@ -454,13 +583,13 @@ will appear if it exists. # If we're the root directory, add an identical by_id subdirectory. if self.inode == llfuse.ROOT_INODE: self._entries['by_id'] = self.inodes.add_entry(MagicDirectory( - self.inode, self.inodes, self.api, self.num_retries)) + self.inode, self.inodes, self.api, self.num_retries, self.pdh_only)) def __contains__(self, k): if k in self._entries: return True - if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k): + if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)): return False try: @@ -468,12 +597,17 @@ will appear if it exists. self.inode, self.inodes, self.api, self.num_retries, k)) if e.update(): - self._entries[k] = e + if k not in self._entries: + self._entries[k] = e + else: + self.inodes.del_entry(e) return True else: + self.inodes.del_entry(e) return False except Exception as e: _logger.debug('arv-mount exception keep %s', e) + self.inodes.del_entry(e) return False def __getitem__(self, item): @@ -506,6 +640,7 @@ class TagsDirectory(RecursiveInvalidateDirectory): self._poll = True self._poll_time = poll_time + @use_counter def update(self): with llfuse.lock_released: tags = self.api.links().list( @@ -533,6 +668,7 @@ class TagDirectory(Directory): self._poll = poll self._poll_time = poll_time + @use_counter def update(self): with llfuse.lock_released: taggedcollections = self.api.links().list( @@ -581,6 +717,7 @@ class ProjectDirectory(Directory): def uuid(self): return self.project_uuid + @use_counter def update(self): if self.project_object_file == None: self.project_object_file = ObjectFile(self.inode, self.project_object) @@ -634,8 +771,9 @@ class ProjectDirectory(Directory): finally: self._updating_lock.release() + @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() if item == '.arvados#project': return self.project_object_file else: @@ -647,6 +785,8 @@ class ProjectDirectory(Directory): else: return super(ProjectDirectory, self).__contains__(k) + @use_counter + @check_update def writable(self): with llfuse.lock_released: if not self._current_user: @@ -656,6 +796,8 @@ class ProjectDirectory(Directory): def persisted(self): return True + @use_counter + @check_update def mkdir(self, name): try: with llfuse.lock_released: @@ -667,6 +809,8 @@ class ProjectDirectory(Directory): _logger.error(error) raise llfuse.FUSEError(errno.EEXIST) + @use_counter + @check_update def rmdir(self, name): if name not in self: raise llfuse.FUSEError(errno.ENOENT) @@ -678,12 +822,32 @@ class ProjectDirectory(Directory): self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries) self.invalidate() + @use_counter + @check_update def rename(self, name_old, name_new, src): - with llfuse.lock_released: - if not isinstance(src, ProjectDirectory): - raise llfuse.FUSEError(errno.EPERM) + if not isinstance(src, ProjectDirectory): + raise llfuse.FUSEError(errno.EPERM) - raise NotImplementedError() + ent = src[name_old] + + if not isinstance(ent, CollectionDirectory): + raise llfuse.FUSEError(errno.EPERM) + + if name_new in self: + # POSIX semantics for replacing one directory with another is + # tricky (the target directory must be empty, the operation must be + # atomic which isn't possible with the Arvados API as of this + # writing) so don't support that. + raise llfuse.FUSEError(errno.EPERM) + + self.api.collections().update(uuid=ent.uuid(), + body={"owner_uuid": self.uuid(), + "name": name_new}).execute(num_retries=self.num_retries) + + # Acually move the entry from source directory to this directory. + del src._entries[name_old] + self._entries[name_new] = ent + self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding)) class SharedDirectory(Directory): @@ -698,6 +862,7 @@ class SharedDirectory(Directory): self._poll = True self._poll_time = poll_time + @use_counter def update(self): with llfuse.lock_released: all_projects = arvados.util.list_all(