X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a11c0e77a0d114d50a997db93174465c9aa85f5a..7e7ada63bca240416584871398076c1bafc90f76:/services/fuse/arvados_fuse/fusedir.py diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index 04c2d50642..30ae6b40e0 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -1,3 +1,7 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + import logging import re import time @@ -8,8 +12,9 @@ import functools import threading from apiclient import errors as apiclient_errors import errno +import time -from fusefile import StringFile, ObjectFile, FuseArvadosFile +from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile from fresh import FreshBase, convertTime, use_counter, check_update import arvados.collection @@ -155,24 +160,38 @@ class Directory(FreshBase): self.fresh() - def clear(self, force=False): - """Delete all entries""" + def in_use(self): + if super(Directory, self).in_use(): + return True + for v in self._entries.itervalues(): + if v.in_use(): + return True + return False - if not self.in_use() or force: - oldentries = self._entries - self._entries = {} - for n in oldentries: - if not oldentries[n].clear(force): - self._entries = oldentries - return False - for n in oldentries: - self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) - self.inodes.del_entry(oldentries[n]) - self.inodes.invalidate_inode(self.inode) - self.invalidate() + def has_ref(self, only_children): + if super(Directory, self).has_ref(only_children): return True - else: - return False + for v in self._entries.itervalues(): + if v.has_ref(False): + return True + return False + + def clear(self): + """Delete all entries""" + oldentries = self._entries + self._entries = {} + for n in oldentries: + oldentries[n].clear() + self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) + self.inodes.del_entry(oldentries[n]) + self.inodes.invalidate_inode(self.inode) + self.invalidate() + + def kernel_invalidate(self): + for n, e in self._entries.iteritems(): + self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding)) + e.kernel_invalidate() + self.inodes.invalidate_inode(self.inode) def mtime(self): return self._mtime @@ -183,6 +202,9 @@ class Directory(FreshBase): def flush(self): pass + def want_event_subscribe(self): + raise NotImplementedError() + def create(self, name): raise NotImplementedError() @@ -316,6 +338,10 @@ class CollectionDirectoryBase(Directory): self.flush() src.flush() + def clear(self): + super(CollectionDirectoryBase, self).clear() + self.collection = None + class CollectionDirectory(CollectionDirectoryBase): """Represents the root of a directory tree representing a collection.""" @@ -326,6 +352,13 @@ class CollectionDirectory(CollectionDirectoryBase): self.num_retries = num_retries self.collection_record_file = None self.collection_record = None + self._poll = True + try: + self._poll_time = (api._rootDesc.get('blobSignatureTtl', 60*60*2)/2) + except: + _logger.debug("Error getting blobSignatureTtl from discovery document: %s", sys.exc_info()[0]) + self._poll_time = 60*60 + if isinstance(collection_record, dict): self.collection_locator = collection_record['uuid'] self._mtime = convertTime(collection_record.get('modified_at')) @@ -343,6 +376,9 @@ class CollectionDirectory(CollectionDirectoryBase): def writable(self): return self.collection.writable() if self.collection is not None else self._writable + def want_event_subscribe(self): + return (uuid_pattern.match(self.collection_locator) is not None) + # Used by arv-web.py to switch the contents of the CollectionDirectory def change_collection(self, new_locator): """Switch the contents of the CollectionDirectory. @@ -356,7 +392,7 @@ class CollectionDirectory(CollectionDirectoryBase): def new_collection(self, new_collection_record, coll_reader): if self.inode: - self.clear(force=True) + self.clear() self.collection_record = new_collection_record @@ -388,7 +424,7 @@ class CollectionDirectory(CollectionDirectoryBase): if not self.stale(): return - _logger.debug("Updating %s", to_record_version) + _logger.debug("Updating collection %s inode %s to record version %s", self.collection_locator, self.inode, to_record_version) if self.collection is not None: if self.collection.known_past_version(to_record_version): _logger.debug("%s already processed %s", self.collection_locator, to_record_version) @@ -434,6 +470,7 @@ class CollectionDirectory(CollectionDirectoryBase): _logger.exception("arv-mount %s: error", self.collection_locator) if self.collection_record is not None and "manifest_text" in self.collection_record: _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"]) + self.invalidate() return False @use_counter @@ -473,6 +510,12 @@ class CollectionDirectory(CollectionDirectoryBase): self.collection.save() self.collection.stop_threads() + def clear(self): + if self.collection is not None: + self.collection.stop_threads() + super(CollectionDirectory, self).clear() + self._manifest_size = 0 + class TmpCollectionDirectory(CollectionDirectoryBase): """A directory backed by an Arvados collection that never gets saved. @@ -483,43 +526,37 @@ class TmpCollectionDirectory(CollectionDirectoryBase): job output. """ + class UnsaveableCollection(arvados.collection.Collection): + def save(self): + pass + def save_new(self): + pass + def __init__(self, parent_inode, inodes, api_client, num_retries): - collection = arvados.collection.Collection( + collection = self.UnsaveableCollection( api_client=api_client, - keep_client=api_client.keep) - collection.save = self._commit_collection - collection.save_new = self._commit_collection + keep_client=api_client.keep, + num_retries=num_retries) super(TmpCollectionDirectory, self).__init__( parent_inode, inodes, collection) self.collection_record_file = None - self._subscribed = False - self._update_collection_record() - - def update(self, *args, **kwargs): - if not self._subscribed: - with llfuse.lock_released: - self.populate(self.mtime()) - self._subscribed = True - - @use_counter - def _commit_collection(self): - """Commit the data blocks, but don't save the collection to API. + self.populate(self.mtime()) - Update the content of the special .arvados#collection file, if - it has been instantiated. - """ - self.collection.flush() - self._update_collection_record() - if self.collection_record_file is not None: - self.collection_record_file.update(self.collection_record) + def on_event(self, *args, **kwargs): + super(TmpCollectionDirectory, self).on_event(*args, **kwargs) + if self.collection_record_file: + with llfuse.lock: + self.collection_record_file.invalidate() self.inodes.invalidate_inode(self.collection_record_file.inode) + _logger.debug("%s invalidated collection record", self) - def _update_collection_record(self): - self.collection_record = { - "uuid": None, - "manifest_text": self.collection.manifest_text(), - "portable_data_hash": self.collection.portable_data_hash(), - } + def collection_record(self): + with llfuse.lock_released: + return { + "uuid": None, + "manifest_text": self.collection.manifest_text(), + "portable_data_hash": self.collection.portable_data_hash(), + } def __contains__(self, k): return (k == '.arvados#collection' or @@ -529,18 +566,29 @@ class TmpCollectionDirectory(CollectionDirectoryBase): def __getitem__(self, item): if item == '.arvados#collection': if self.collection_record_file is None: - self.collection_record_file = ObjectFile( + self.collection_record_file = FuncToJSONFile( self.inode, self.collection_record) self.inodes.add_entry(self.collection_record_file) return self.collection_record_file return super(TmpCollectionDirectory, self).__getitem__(item) + def persisted(self): + return False + def writable(self): return True + def want_event_subscribe(self): + return False + def finalize(self): self.collection.stop_threads() + def invalidate(self): + if self.collection_record_file: + self.collection_record_file.invalidate() + super(TmpCollectionDirectory, self).invalidate() + class MagicDirectory(Directory): """A special directory that logically contains the set of all extant keep locators. @@ -601,10 +649,11 @@ will appear if it exists. self.inodes.del_entry(e) return True else: + self.inodes.invalidate_entry(self.inode, k) self.inodes.del_entry(e) return False - except Exception as e: - _logger.debug('arv-mount exception keep %s', e) + except Exception as ex: + _logger.debug('arv-mount exception keep %s', ex) self.inodes.del_entry(e) return False @@ -614,21 +663,14 @@ will appear if it exists. else: raise KeyError("No collection with id " + item) - def clear(self, force=False): + def clear(self): pass - -class RecursiveInvalidateDirectory(Directory): - def invalidate(self): - try: - super(RecursiveInvalidateDirectory, self).invalidate() - for a in self._entries: - self._entries[a].invalidate() - except Exception: - _logger.exception() + def want_event_subscribe(self): + return not self.pdh_only -class TagsDirectory(RecursiveInvalidateDirectory): +class TagsDirectory(Directory): """A special directory that contains as subdirectories all tags visible to the user.""" def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60): @@ -637,20 +679,50 @@ class TagsDirectory(RecursiveInvalidateDirectory): self.num_retries = num_retries self._poll = True self._poll_time = poll_time + self._extra = set() + + def want_event_subscribe(self): + return True @use_counter def update(self): with llfuse.lock_released: tags = self.api.links().list( - filters=[['link_class', '=', 'tag']], - select=['name'], distinct=True + filters=[['link_class', '=', 'tag'], ["name", "!=", ""]], + select=['name'], distinct=True, limit=1000 ).execute(num_retries=self.num_retries) if "items" in tags: - self.merge(tags['items'], + self.merge(tags['items']+[{"name": n} for n in self._extra], lambda i: i['name'], lambda a, i: a.tag == i['name'], lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time)) + @use_counter + @check_update + def __getitem__(self, item): + if super(TagsDirectory, self).__contains__(item): + return super(TagsDirectory, self).__getitem__(item) + with llfuse.lock_released: + tags = self.api.links().list( + filters=[['link_class', '=', 'tag'], ['name', '=', item]], limit=1 + ).execute(num_retries=self.num_retries) + if tags["items"]: + self._extra.add(item) + self.update() + return super(TagsDirectory, self).__getitem__(item) + + @use_counter + @check_update + def __contains__(self, k): + if super(TagsDirectory, self).__contains__(k): + return True + try: + self[k] + return True + except KeyError: + pass + return False + class TagDirectory(Directory): """A special directory that contains as subdirectories all collections visible @@ -666,6 +738,9 @@ class TagDirectory(Directory): self._poll = poll self._poll_time = poll_time + def want_event_subscribe(self): + return True + @use_counter def update(self): with llfuse.lock_released: @@ -697,6 +772,9 @@ class ProjectDirectory(Directory): self._updating_lock = threading.Lock() self._current_user = None + def want_event_subscribe(self): + return True + def createDirectory(self, i): if collection_uuid_pattern.match(i['uuid']): return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i) @@ -917,3 +995,6 @@ class SharedDirectory(Directory): lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time)) except Exception: _logger.exception() + + def want_event_subscribe(self): + return True