X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d14dd75b263d8f999603b66d23f74667d36a2412..89be4b30feccc3680ca77339711b29367754dc05:/services/fuse/arvados_fuse/fusedir.py?ds=sidebyside diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index 2757091aa4..2d58012fa8 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + import logging import re import time @@ -5,10 +9,15 @@ import llfuse import arvados import apiclient import functools +import threading +from apiclient import errors as apiclient_errors +import errno +import time -from fusefile import StringFile, StreamReaderFile, ObjectFile -from fresh import FreshBase, convertTime, use_counter +from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile +from fresh import FreshBase, convertTime, use_counter, check_update +import arvados.collection from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern _logger = logging.getLogger('arvados.arvados_fuse') @@ -19,6 +28,7 @@ _logger = logging.getLogger('arvados.arvados_fuse') # appear as underscores in the fuse mount.) _disallowed_filename_characters = re.compile('[\x00/]') +# '.' and '..' are not reachable if API server is newer than #6277 def sanitize_filename(dirty): """Replace disallowed filename characters with harmless "_".""" if dirty is None: @@ -41,9 +51,10 @@ class Directory(FreshBase): """ def __init__(self, parent_inode, inodes): + """parent_inode is the integer inode number""" + super(Directory, self).__init__() - """parent_inode is the integer inode number""" self.inode = None if not isinstance(parent_inode, int): raise Exception("parent_inode should be an int") @@ -71,23 +82,28 @@ class Directory(FreshBase): try: self.update() except apiclient.errors.HttpError as e: - _logger.debug(e) + _logger.warn(e) @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() return self._entries[item] @use_counter + @check_update def items(self): - self.checkupdate() return list(self._entries.items()) @use_counter + @check_update def __contains__(self, k): - self.checkupdate() return k in self._entries + @use_counter + @check_update + def __len__(self): + return len(self._entries) + def fresh(self): self.inodes.touch(self) super(Directory, self).fresh() @@ -124,6 +140,7 @@ class Directory(FreshBase): self._entries[name] = oldentries[name] del oldentries[name] else: + _logger.debug("Adding entry '%s' to inode %i", name, self.inode) # create new directory entry ent = new_entry(i) if ent is not None: @@ -132,58 +149,241 @@ class Directory(FreshBase): # delete any other directory entries that were not in found in 'items' for i in oldentries: - llfuse.invalidate_entry(self.inode, str(i)) + _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode) + self.inodes.invalidate_entry(self, i) self.inodes.del_entry(oldentries[i]) changed = True if changed: + self.inodes.invalidate_inode(self) self._mtime = time.time() self.fresh() - def clear(self, force=False): - """Delete all entries""" + def in_use(self): + if super(Directory, self).in_use(): + return True + for v in self._entries.itervalues(): + if v.in_use(): + return True + return False - if not self.in_use() or force: - oldentries = self._entries - self._entries = {} - for n in oldentries: - if not oldentries[n].clear(force): - self._entries = oldentries - return False - for n in oldentries: - llfuse.invalidate_entry(self.inode, str(n)) - self.inodes.del_entry(oldentries[n]) - llfuse.invalidate_inode(self.inode) - self.invalidate() + def has_ref(self, only_children): + if super(Directory, self).has_ref(only_children): return True - else: - return False + for v in self._entries.itervalues(): + if v.has_ref(False): + return True + return False + + def clear(self): + """Delete all entries""" + oldentries = self._entries + self._entries = {} + for n in oldentries: + oldentries[n].clear() + self.inodes.del_entry(oldentries[n]) + self.invalidate() + + def kernel_invalidate(self): + # Invalidating the dentry on the parent implies invalidating all paths + # below it as well. + parent = self.inodes[self.parent_inode] + + # Find self on the parent in order to invalidate this path. + # Calling the public items() method might trigger a refresh, + # which we definitely don't want, so read the internal dict directly. + for k,v in parent._entries.items(): + if v is self: + self.inodes.invalidate_entry(parent, k) + break def mtime(self): return self._mtime + def writable(self): + return False + + def flush(self): + pass + + def want_event_subscribe(self): + raise NotImplementedError() + + def create(self, name): + raise NotImplementedError() + + def mkdir(self, name): + raise NotImplementedError() + + def unlink(self, name): + raise NotImplementedError() + + def rmdir(self, name): + raise NotImplementedError() + + def rename(self, name_old, name_new, src): + raise NotImplementedError() + + +class CollectionDirectoryBase(Directory): + """Represent an Arvados Collection as a directory. + + This class is used for Subcollections, and is also the base class for + CollectionDirectory, which implements collection loading/saving on + Collection records. + + Most operations act only the underlying Arvados `Collection` object. The + `Collection` object signals via a notify callback to + `CollectionDirectoryBase.on_event` that an item was added, removed or + modified. FUSE inodes and directory entries are created, deleted or + invalidated in response to these events. + + """ + + def __init__(self, parent_inode, inodes, collection): + super(CollectionDirectoryBase, self).__init__(parent_inode, inodes) + self.collection = collection + + def new_entry(self, name, item, mtime): + name = sanitize_filename(name) + if hasattr(item, "fuse_entry") and item.fuse_entry is not None: + if item.fuse_entry.dead is not True: + raise Exception("Can only reparent dead inode entry") + if item.fuse_entry.inode is None: + raise Exception("Reparented entry must still have valid inode") + item.fuse_entry.dead = False + self._entries[name] = item.fuse_entry + elif isinstance(item, arvados.collection.RichCollectionBase): + self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item)) + self._entries[name].populate(mtime) + else: + self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime)) + item.fuse_entry = self._entries[name] + + def on_event(self, event, collection, name, item): + if collection == self.collection: + name = sanitize_filename(name) + _logger.debug("collection notify %s %s %s %s", event, collection, name, item) + with llfuse.lock: + if event == arvados.collection.ADD: + self.new_entry(name, item, self.mtime()) + elif event == arvados.collection.DEL: + ent = self._entries[name] + del self._entries[name] + self.inodes.invalidate_entry(self, name) + self.inodes.del_entry(ent) + elif event == arvados.collection.MOD: + if hasattr(item, "fuse_entry") and item.fuse_entry is not None: + self.inodes.invalidate_inode(item.fuse_entry) + elif name in self._entries: + self.inodes.invalidate_inode(self._entries[name]) + + def populate(self, mtime): + self._mtime = mtime + self.collection.subscribe(self.on_event) + for entry, item in self.collection.items(): + self.new_entry(entry, item, self.mtime()) + + def writable(self): + return self.collection.writable() + + @use_counter + def flush(self): + with llfuse.lock_released: + self.collection.root_collection().save() + + @use_counter + @check_update + def create(self, name): + with llfuse.lock_released: + self.collection.open(name, "w").close() + + @use_counter + @check_update + def mkdir(self, name): + with llfuse.lock_released: + self.collection.mkdirs(name) + + @use_counter + @check_update + def unlink(self, name): + with llfuse.lock_released: + self.collection.remove(name) + self.flush() + + @use_counter + @check_update + def rmdir(self, name): + with llfuse.lock_released: + self.collection.remove(name) + self.flush() + + @use_counter + @check_update + def rename(self, name_old, name_new, src): + if not isinstance(src, CollectionDirectoryBase): + raise llfuse.FUSEError(errno.EPERM) + + if name_new in self: + ent = src[name_old] + tgt = self[name_new] + if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile): + pass + elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase): + if len(tgt) > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile): + raise llfuse.FUSEError(errno.ENOTDIR) + elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase): + raise llfuse.FUSEError(errno.EISDIR) + + with llfuse.lock_released: + self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True) + self.flush() + src.flush() + + def clear(self): + super(CollectionDirectoryBase, self).clear() + self.collection = None -class CollectionDirectory(Directory): - """Represents the root of a directory tree holding a collection.""" - def __init__(self, parent_inode, inodes, api, num_retries, collection): - super(CollectionDirectory, self).__init__(parent_inode, inodes) +class CollectionDirectory(CollectionDirectoryBase): + """Represents the root of a directory tree representing a collection.""" + + def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None): + super(CollectionDirectory, self).__init__(parent_inode, inodes, None) self.api = api self.num_retries = num_retries - self.collection_object_file = None - self.collection_object = None - if isinstance(collection, dict): - self.collection_locator = collection['uuid'] - self._mtime = convertTime(collection.get('modified_at')) + self.collection_record_file = None + self.collection_record = None + self._poll = True + try: + self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2) + except: + _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0]) + self._poll_time = 60*60 + + if isinstance(collection_record, dict): + self.collection_locator = collection_record['uuid'] + self._mtime = convertTime(collection_record.get('modified_at')) else: - self.collection_locator = collection + self.collection_locator = collection_record self._mtime = 0 self._manifest_size = 0 + if self.collection_locator: + self._writable = (uuid_pattern.match(self.collection_locator) is not None) + self._updating_lock = threading.Lock() def same(self, i): return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator + def writable(self): + return self.collection.writable() if self.collection is not None else self._writable + + def want_event_subscribe(self): + return (uuid_pattern.match(self.collection_locator) is not None) + # Used by arv-web.py to switch the contents of the CollectionDirectory def change_collection(self, new_locator): """Switch the contents of the CollectionDirectory. @@ -192,82 +392,100 @@ class CollectionDirectory(Directory): """ self.collection_locator = new_locator - self.collection_object = None + self.collection_record = None self.update() - def new_collection(self, new_collection_object, coll_reader): - self.clear(force=True) + def new_collection(self, new_collection_record, coll_reader): + if self.inode: + self.clear() - self.collection_object = new_collection_object + self.collection_record = new_collection_record - self._mtime = convertTime(self.collection_object.get('modified_at')) + if self.collection_record: + self._mtime = convertTime(self.collection_record.get('modified_at')) + self.collection_locator = self.collection_record["uuid"] + if self.collection_record_file is not None: + self.collection_record_file.update(self.collection_record) - if self.collection_object_file is not None: - self.collection_object_file.update(self.collection_object) + self.collection = coll_reader + self.populate(self.mtime()) - for s in coll_reader.all_streams(): - cwd = self - for part in s.name().split('/'): - if part != '' and part != '.': - partname = sanitize_filename(part) - if partname not in cwd._entries: - cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes)) - cwd = cwd._entries[partname] - for k, v in s.files().items(): - cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime())) + def uuid(self): + return self.collection_locator - def update(self): + @use_counter + def update(self, to_record_version=None): try: - if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator): + if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator): return True if self.collection_locator is None: self.fresh() return True - with llfuse.lock_released: - coll_reader = arvados.CollectionReader( - self.collection_locator, self.api, self.api.keep, - num_retries=self.num_retries) - new_collection_object = coll_reader.api_response() or {} - # If the Collection only exists in Keep, there will be no API - # response. Fill in the fields we need. - if 'uuid' not in new_collection_object: - new_collection_object['uuid'] = self.collection_locator - if "portable_data_hash" not in new_collection_object: - new_collection_object["portable_data_hash"] = new_collection_object["uuid"] - if 'manifest_text' not in new_collection_object: - new_collection_object['manifest_text'] = coll_reader.manifest_text() - coll_reader.normalize() - # end with llfuse.lock_released, re-acquire lock - - if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]: - self.new_collection(new_collection_object, coll_reader) - - self._manifest_size = len(coll_reader.manifest_text()) - _logger.debug("%s manifest_size %i", self, self._manifest_size) + try: + with llfuse.lock_released: + self._updating_lock.acquire() + if not self.stale(): + return + + _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) + if self.collection is not None: + if self.collection.known_past_version(to_record_version): + _logger.debug("%s already processed %s", self.collection_locator, to_record_version) + else: + self.collection.update() + else: + if uuid_pattern.match(self.collection_locator): + coll_reader = arvados.collection.Collection( + self.collection_locator, self.api, self.api.keep, + num_retries=self.num_retries) + else: + coll_reader = arvados.collection.CollectionReader( + self.collection_locator, self.api, self.api.keep, + num_retries=self.num_retries) + new_collection_record = coll_reader.api_response() or {} + # If the Collection only exists in Keep, there will be no API + # response. Fill in the fields we need. + if 'uuid' not in new_collection_record: + new_collection_record['uuid'] = self.collection_locator + if "portable_data_hash" not in new_collection_record: + new_collection_record["portable_data_hash"] = new_collection_record["uuid"] + if 'manifest_text' not in new_collection_record: + new_collection_record['manifest_text'] = coll_reader.manifest_text() + + if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"): + self.new_collection(new_collection_record, coll_reader) + + self._manifest_size = len(coll_reader.manifest_text()) + _logger.debug("%s manifest_size %i", self, self._manifest_size) + # end with llfuse.lock_released, re-acquire lock - self.fresh() - return True - except arvados.errors.NotFoundError: - _logger.exception("arv-mount %s: error", self.collection_locator) + self.fresh() + return True + finally: + self._updating_lock.release() + except arvados.errors.NotFoundError as e: + _logger.error("Error fetching collection '%s': %s", self.collection_locator, e) except arvados.errors.ArgumentError as detail: _logger.warning("arv-mount %s: error %s", self.collection_locator, detail) - if self.collection_object is not None and "manifest_text" in self.collection_object: - _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"]) + if self.collection_record is not None and "manifest_text" in self.collection_record: + _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) except Exception: _logger.exception("arv-mount %s: error", self.collection_locator) - if self.collection_object is not None and "manifest_text" in self.collection_object: - _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"]) + if self.collection_record is not None and "manifest_text" in self.collection_record: + _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + self.invalidate() return False + @use_counter + @check_update def __getitem__(self, item): - self.checkupdate() if item == '.arvados#collection': - if self.collection_object_file is None: - self.collection_object_file = ObjectFile(self.inode, self.collection_object) - self.inodes.add_entry(self.collection_object_file) - return self.collection_object_file + if self.collection_record_file is None: + self.collection_record_file = ObjectFile(self.inode, self.collection_record) + self.inodes.add_entry(self.collection_record_file) + return self.collection_record_file else: return super(CollectionDirectory, self).__getitem__(item) @@ -278,8 +496,8 @@ class CollectionDirectory(Directory): return super(CollectionDirectory, self).__contains__(k) def invalidate(self): - self.collection_object = None - self.collection_object_file = None + self.collection_record = None + self.collection_record_file = None super(CollectionDirectory, self).invalidate() def persisted(self): @@ -291,6 +509,92 @@ class CollectionDirectory(Directory): # footprint directly would be more accurate, but also more complicated. return self._manifest_size * 128 + def finalize(self): + if self.collection is not None: + if self.writable(): + self.collection.save() + self.collection.stop_threads() + + def clear(self): + if self.collection is not None: + self.collection.stop_threads() + super(CollectionDirectory, self).clear() + self._manifest_size = 0 + + +class TmpCollectionDirectory(CollectionDirectoryBase): + """A directory backed by an Arvados collection that never gets saved. + + This supports using Keep as scratch space. A userspace program can + read the .arvados#collection file to get a current manifest in + order to save a snapshot of the scratch data or use it as a crunch + job output. + """ + + class UnsaveableCollection(arvados.collection.Collection): + def save(self): + pass + def save_new(self): + pass + + def __init__(self, parent_inode, inodes, api_client, num_retries): + collection = self.UnsaveableCollection( + api_client=api_client, + keep_client=api_client.keep, + num_retries=num_retries) + super(TmpCollectionDirectory, self).__init__( + parent_inode, inodes, collection) + self.collection_record_file = None + self.populate(self.mtime()) + + def on_event(self, *args, **kwargs): + super(TmpCollectionDirectory, self).on_event(*args, **kwargs) + if self.collection_record_file: + with llfuse.lock: + self.collection_record_file.invalidate() + self.inodes.invalidate_inode(self.collection_record_file) + _logger.debug("%s invalidated collection record", self) + + def collection_record(self): + with llfuse.lock_released: + return { + "uuid": None, + "manifest_text": self.collection.manifest_text(), + "portable_data_hash": self.collection.portable_data_hash(), + } + + def __contains__(self, k): + return (k == '.arvados#collection' or + super(TmpCollectionDirectory, self).__contains__(k)) + + @use_counter + def __getitem__(self, item): + if item == '.arvados#collection': + if self.collection_record_file is None: + self.collection_record_file = FuncToJSONFile( + self.inode, self.collection_record) + self.inodes.add_entry(self.collection_record_file) + return self.collection_record_file + return super(TmpCollectionDirectory, self).__getitem__(item) + + def persisted(self): + return False + + def writable(self): + return True + + def want_event_subscribe(self): + return False + + def finalize(self): + self.collection.stop_threads() + + def invalidate(self): + if self.collection_record_file: + self.collection_record_file.invalidate() + super(TmpCollectionDirectory, self).invalidate() + + class MagicDirectory(Directory): """A special directory that logically contains the set of all extant keep locators. @@ -305,18 +609,21 @@ class MagicDirectory(Directory): README_TEXT = """ This directory provides access to Arvados collections as subdirectories listed by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in -the form '1234567890abcdefghijklmnopqrstuv+123'). +the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid +(in the form 'zzzzz-j7d0g-1234567890abcde'). Note that this directory will appear empty until you attempt to access a -specific collection subdirectory (such as trying to 'cd' into it), at which -point the collection will actually be looked up on the server and the directory -will appear if it exists. +specific collection or project subdirectory (such as trying to 'cd' into it), +at which point the collection or project will actually be looked up on the server +and the directory will appear if it exists. + """.lstrip() - def __init__(self, parent_inode, inodes, api, num_retries): + def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False): super(MagicDirectory, self).__init__(parent_inode, inodes) self.api = api self.num_retries = num_retries + self.pdh_only = pdh_only def __setattr__(self, name, value): super(MagicDirectory, self).__setattr__(name, value) @@ -328,26 +635,43 @@ will appear if it exists. # If we're the root directory, add an identical by_id subdirectory. if self.inode == llfuse.ROOT_INODE: self._entries['by_id'] = self.inodes.add_entry(MagicDirectory( - self.inode, self.inodes, self.api, self.num_retries)) + self.inode, self.inodes, self.api, self.num_retries, self.pdh_only)) def __contains__(self, k): if k in self._entries: return True - if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k): + if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)): return False try: - e = self.inodes.add_entry(CollectionDirectory( - self.inode, self.inodes, self.api, self.num_retries, k)) + e = None + + if group_uuid_pattern.match(k): + project = self.api.groups().list( + filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries) + if project[u'items_available'] == 0: + return False + e = self.inodes.add_entry(ProjectDirectory( + self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0])) + else: + e = self.inodes.add_entry(CollectionDirectory( + self.inode, self.inodes, self.api, self.num_retries, k)) if e.update(): - self._entries[k] = e + if k not in self._entries: + self._entries[k] = e + else: + self.inodes.del_entry(e) return True else: + self.inodes.invalidate_entry(self, k) + self.inodes.del_entry(e) return False - except Exception as e: - _logger.debug('arv-mount exception keep %s', e) + except Exception as ex: + _logger.exception("arv-mount lookup '%s':", k) + if e is not None: + self.inodes.del_entry(e) return False def __getitem__(self, item): @@ -356,21 +680,14 @@ will appear if it exists. else: raise KeyError("No collection with id " + item) - def clear(self, force=False): + def clear(self): pass - -class RecursiveInvalidateDirectory(Directory): - def invalidate(self): - try: - super(RecursiveInvalidateDirectory, self).invalidate() - for a in self._entries: - self._entries[a].invalidate() - except Exception: - _logger.exception() + def want_event_subscribe(self): + return not self.pdh_only -class TagsDirectory(RecursiveInvalidateDirectory): +class TagsDirectory(Directory): """A special directory that contains as subdirectories all tags visible to the user.""" def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60): @@ -379,19 +696,50 @@ class TagsDirectory(RecursiveInvalidateDirectory): self.num_retries = num_retries self._poll = True self._poll_time = poll_time + self._extra = set() + + def want_event_subscribe(self): + return True + @use_counter def update(self): with llfuse.lock_released: tags = self.api.links().list( - filters=[['link_class', '=', 'tag']], - select=['name'], distinct=True + filters=[['link_class', '=', 'tag'], ["name", "!=", ""]], + select=['name'], distinct=True, limit=1000 ).execute(num_retries=self.num_retries) if "items" in tags: - self.merge(tags['items'], + self.merge(tags['items']+[{"name": n} for n in self._extra], lambda i: i['name'], lambda a, i: a.tag == i['name'], lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time)) + @use_counter + @check_update + def __getitem__(self, item): + if super(TagsDirectory, self).__contains__(item): + return super(TagsDirectory, self).__getitem__(item) + with llfuse.lock_released: + tags = self.api.links().list( + filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1 + ).execute(num_retries=self.num_retries) + if tags["items"]: + self._extra.add(item) + self.update() + return super(TagsDirectory, self).__getitem__(item) + + @use_counter + @check_update + def __contains__(self, k): + if super(TagsDirectory, self).__contains__(k): + return True + try: + self[k] + return True + except KeyError: + pass + return False + class TagDirectory(Directory): """A special directory that contains as subdirectories all collections visible @@ -407,6 +755,10 @@ class TagDirectory(Directory): self._poll = poll self._poll_time = poll_time + def want_event_subscribe(self): + return True + + @use_counter def update(self): with llfuse.lock_released: taggedcollections = self.api.links().list( @@ -431,9 +783,15 @@ class ProjectDirectory(Directory): self.num_retries = num_retries self.project_object = project_object self.project_object_file = None - self.uuid = project_object['uuid'] + self.project_uuid = project_object['uuid'] self._poll = poll self._poll_time = poll_time + self._updating_lock = threading.Lock() + self._current_user = None + self._full_listing = False + + def want_event_subscribe(self): + return True def createDirectory(self, i): if collection_uuid_pattern.match(i['uuid']): @@ -450,69 +808,224 @@ class ProjectDirectory(Directory): else: return None + def uuid(self): + return self.project_uuid + + def items(self): + self._full_listing = True + return super(ProjectDirectory, self).items() + + def namefn(self, i): + if 'name' in i: + if i['name'] is None or len(i['name']) == 0: + return None + elif "uuid" in i and (collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid'])): + # collection or subproject + return i['name'] + elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection': + # name link + return i['name'] + elif 'kind' in i and i['kind'].startswith('arvados#'): + # something else + return "{}.{}".format(i['name'], i['kind'][8:]) + else: + return None + + + @use_counter def update(self): if self.project_object_file == None: self.project_object_file = ObjectFile(self.inode, self.project_object) self.inodes.add_entry(self.project_object_file) - def namefn(i): - if 'name' in i: - if i['name'] is None or len(i['name']) == 0: - return None - elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']): - # collection or subproject - return i['name'] - elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection': - # name link - return i['name'] - elif 'kind' in i and i['kind'].startswith('arvados#'): - # something else - return "{}.{}".format(i['name'], i['kind'][8:]) - else: - return None + if not self._full_listing: + return True def samefn(a, i): - if isinstance(a, CollectionDirectory): - return a.collection_locator == i['uuid'] - elif isinstance(a, ProjectDirectory): - return a.uuid == i['uuid'] + if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory): + return a.uuid() == i['uuid'] elif isinstance(a, ObjectFile): - return a.uuid == i['uuid'] and not a.stale() + return a.uuid() == i['uuid'] and not a.stale() return False - with llfuse.lock_released: - if group_uuid_pattern.match(self.uuid): - self.project_object = self.api.groups().get( - uuid=self.uuid).execute(num_retries=self.num_retries) - elif user_uuid_pattern.match(self.uuid): - self.project_object = self.api.users().get( - uuid=self.uuid).execute(num_retries=self.num_retries) + try: + with llfuse.lock_released: + self._updating_lock.acquire() + if not self.stale(): + return + + if group_uuid_pattern.match(self.project_uuid): + self.project_object = self.api.groups().get( + uuid=self.project_uuid).execute(num_retries=self.num_retries) + elif user_uuid_pattern.match(self.project_uuid): + self.project_object = self.api.users().get( + uuid=self.project_uuid).execute(num_retries=self.num_retries) + + contents = arvados.util.list_all(self.api.groups().list, + self.num_retries, + filters=[["owner_uuid", "=", self.project_uuid], + ["group_class", "=", "project"]]) + contents.extend(arvados.util.list_all(self.api.collections().list, + self.num_retries, + filters=[["owner_uuid", "=", self.project_uuid]])) - contents = arvados.util.list_all(self.api.groups().contents, - self.num_retries, uuid=self.uuid) + # end with llfuse.lock_released, re-acquire lock - # end with llfuse.lock_released, re-acquire lock + self.merge(contents, + self.namefn, + samefn, + self.createDirectory) + return True + finally: + self._updating_lock.release() - self.merge(contents, - namefn, - samefn, - self.createDirectory) + def _add_entry(self, i, name): + ent = self.createDirectory(i) + self._entries[name] = self.inodes.add_entry(ent) + return self._entries[name] - def __getitem__(self, item): - self.checkupdate() - if item == '.arvados#project': + @use_counter + @check_update + def __getitem__(self, k): + if k == '.arvados#project': return self.project_object_file - else: - return super(ProjectDirectory, self).__getitem__(item) + elif self._full_listing or super(ProjectDirectory, self).__contains__(k): + return super(ProjectDirectory, self).__getitem__(k) + with llfuse.lock_released: + contents = self.api.groups().list(filters=[["owner_uuid", "=", self.project_uuid], + ["group_class", "=", "project"], + ["name", "=", k]], + limit=1).execute(num_retries=self.num_retries)["items"] + if not contents: + contents = self.api.collections().list(filters=[["owner_uuid", "=", self.project_uuid], + ["name", "=", k]], + limit=1).execute(num_retries=self.num_retries)["items"] + if contents: + name = sanitize_filename(self.namefn(contents[0])) + if name != k: + raise KeyError(k) + return self._add_entry(contents[0], name) + + # Didn't find item + raise KeyError(k) def __contains__(self, k): if k == '.arvados#project': return True - else: - return super(ProjectDirectory, self).__contains__(k) + try: + self[k] + return True + except KeyError: + pass + return False + + @use_counter + @check_update + def writable(self): + with llfuse.lock_released: + if not self._current_user: + self._current_user = self.api.users().current().execute(num_retries=self.num_retries) + return self._current_user["uuid"] in self.project_object.get("writable_by", []) def persisted(self): - return False + return True + + @use_counter + @check_update + def mkdir(self, name): + try: + with llfuse.lock_released: + self.api.collections().create(body={"owner_uuid": self.project_uuid, + "name": name, + "manifest_text": ""}).execute(num_retries=self.num_retries) + self.invalidate() + except apiclient_errors.Error as error: + _logger.error(error) + raise llfuse.FUSEError(errno.EEXIST) + + @use_counter + @check_update + def rmdir(self, name): + if name not in self: + raise llfuse.FUSEError(errno.ENOENT) + if not isinstance(self[name], CollectionDirectory): + raise llfuse.FUSEError(errno.EPERM) + if len(self[name]) > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + with llfuse.lock_released: + self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries) + self.invalidate() + + @use_counter + @check_update + def rename(self, name_old, name_new, src): + if not isinstance(src, ProjectDirectory): + raise llfuse.FUSEError(errno.EPERM) + + ent = src[name_old] + + if not isinstance(ent, CollectionDirectory): + raise llfuse.FUSEError(errno.EPERM) + + if name_new in self: + # POSIX semantics for replacing one directory with another is + # tricky (the target directory must be empty, the operation must be + # atomic which isn't possible with the Arvados API as of this + # writing) so don't support that. + raise llfuse.FUSEError(errno.EPERM) + + self.api.collections().update(uuid=ent.uuid(), + body={"owner_uuid": self.uuid(), + "name": name_new}).execute(num_retries=self.num_retries) + + # Acually move the entry from source directory to this directory. + del src._entries[name_old] + self._entries[name_new] = ent + self.inodes.invalidate_entry(src, name_old) + + @use_counter + def child_event(self, ev): + properties = ev.get("properties") or {} + old_attrs = properties.get("old_attributes") or {} + new_attrs = properties.get("new_attributes") or {} + old_attrs["uuid"] = ev["object_uuid"] + new_attrs["uuid"] = ev["object_uuid"] + old_name = sanitize_filename(self.namefn(old_attrs)) + new_name = sanitize_filename(self.namefn(new_attrs)) + + # create events will have a new name, but not an old name + # delete events will have an old name, but not a new name + # update events will have an old and new name, and they may be same or different + # if they are the same, an unrelated field changed and there is nothing to do. + + if old_attrs.get("owner_uuid") != self.project_uuid: + # Was moved from somewhere else, so don't try to remove entry. + old_name = None + if ev.get("object_owner_uuid") != self.project_uuid: + # Was moved to somewhere else, so don't try to add entry + new_name = None + + if old_attrs.get("is_trashed"): + # Was previously deleted + old_name = None + if new_attrs.get("is_trashed"): + # Has been deleted + new_name = None + + if new_name != old_name: + ent = None + if old_name in self._entries: + ent = self._entries[old_name] + del self._entries[old_name] + self.inodes.invalidate_entry(self, old_name) + + if new_name: + if ent is not None: + self._entries[new_name] = ent + else: + self._add_entry(new_attrs, new_name) + elif ent is not None: + self.inodes.del_entry(ent) class SharedDirectory(Directory): @@ -526,57 +1039,91 @@ class SharedDirectory(Directory): self.current_user = api.users().current().execute(num_retries=num_retries) self._poll = True self._poll_time = poll_time + self._updating_lock = threading.Lock() + @use_counter def update(self): - with llfuse.lock_released: - all_projects = arvados.util.list_all( - self.api.groups().list, self.num_retries, - filters=[['group_class','=','project']]) - objects = {} - for ob in all_projects: - objects[ob['uuid']] = ob - - roots = [] - root_owners = {} - for ob in all_projects: - if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects: - roots.append(ob) - root_owners[ob['owner_uuid']] = True - - lusers = arvados.util.list_all( - self.api.users().list, self.num_retries, - filters=[['uuid','in', list(root_owners)]]) - lgroups = arvados.util.list_all( - self.api.groups().list, self.num_retries, - filters=[['uuid','in', list(root_owners)]]) - - users = {} - groups = {} - - for l in lusers: - objects[l["uuid"]] = l - for l in lgroups: - objects[l["uuid"]] = l - - contents = {} - for r in root_owners: - if r in objects: - obr = objects[r] - if "name" in obr: - contents[obr["name"]] = obr - if "first_name" in obr: - contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr - - for r in roots: - if r['owner_uuid'] not in objects: - contents[r['name']] = r - - # end with llfuse.lock_released, re-acquire lock - try: + with llfuse.lock_released: + self._updating_lock.acquire() + if not self.stale(): + return + + contents = {} + roots = [] + root_owners = set() + objects = {} + + methods = self.api._rootDesc.get('resources')["groups"]['methods'] + if 'httpMethod' in methods.get('shared', {}): + page = [] + while True: + resp = self.api.groups().shared(filters=[['group_class', '=', 'project']]+page, + order="uuid", + limit=10000, + count="none", + include="owner_uuid").execute() + if not resp["items"]: + break + page = [["uuid", ">", resp["items"][len(resp["items"])-1]["uuid"]]] + for r in resp["items"]: + objects[r["uuid"]] = r + roots.append(r["uuid"]) + for r in resp["included"]: + objects[r["uuid"]] = r + root_owners.add(r["uuid"]) + else: + all_projects = arvados.util.list_all( + self.api.groups().list, self.num_retries, + filters=[['group_class','=','project']], + select=["uuid", "owner_uuid"]) + for ob in all_projects: + objects[ob['uuid']] = ob + + current_uuid = self.current_user['uuid'] + for ob in all_projects: + if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects: + roots.append(ob['uuid']) + root_owners.add(ob['owner_uuid']) + + lusers = arvados.util.list_all( + self.api.users().list, self.num_retries, + filters=[['uuid','in', list(root_owners)]]) + lgroups = arvados.util.list_all( + self.api.groups().list, self.num_retries, + filters=[['uuid','in', list(root_owners)+roots]]) + + for l in lusers: + objects[l["uuid"]] = l + for l in lgroups: + objects[l["uuid"]] = l + + for r in root_owners: + if r in objects: + obr = objects[r] + if obr.get("name"): + contents[obr["name"]] = obr + #elif obr.get("username"): + # contents[obr["username"]] = obr + elif "first_name" in obr: + contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr + + for r in roots: + if r in objects: + obr = objects[r] + if obr['owner_uuid'] not in objects: + contents[obr["name"]] = obr + + # end with llfuse.lock_released, re-acquire lock + self.merge(contents.items(), lambda i: i[0], - lambda a, i: a.uuid == i[1]['uuid'], + lambda a, i: a.uuid() == i[1]['uuid'], lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time)) except Exception: - _logger.exception() + _logger.exception("arv-mount shared dir error") + finally: + self._updating_lock.release() + + def want_event_subscribe(self): + return True