X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5e27876fa4d3faf3b973282bfb4f152c02345bdc..8b3478bda6764b3f30aef69ec0a93729495296c0:/services/fuse/arvados_fuse/__init__.py diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index b24aaa6d3d..fd25aa9b5e 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -66,6 +66,7 @@ import itertools import ciso8601 import collections import functools +import arvados.keep import Queue @@ -148,7 +149,9 @@ class InodeCache(object): self._total -= obj.cache_size del self._entries[obj.cache_priority] if obj.cache_uuid: - del self._by_uuid[obj.cache_uuid] + self._by_uuid[obj.cache_uuid].remove(obj) + if not self._by_uuid[obj.cache_uuid]: + del self._by_uuid[obj.cache_uuid] obj.cache_uuid = None if clear: _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total) @@ -168,9 +171,13 @@ class InodeCache(object): self._entries[obj.cache_priority] = obj obj.cache_uuid = obj.uuid() if obj.cache_uuid: - self._by_uuid[obj.cache_uuid] = obj + if obj.cache_uuid not in self._by_uuid: + self._by_uuid[obj.cache_uuid] = [obj] + else: + if obj not in self._by_uuid[obj.cache_uuid]: + self._by_uuid[obj.cache_uuid].append(obj) self._total += obj.objsize() - _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total) + _logger.debug("InodeCache touched %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total) self.cap_cache() else: obj.cache_priority = None @@ -188,6 +195,11 @@ class InodeCache(object): def find(self, uuid): return self._by_uuid.get(uuid) + def clear(self): + self._entries.clear() + self._by_uuid.clear() + self._total = 0 + class Inodes(object): """Manage the set of inodes. This is the mapping from a numeric id to a concrete File or Directory object""" @@ -244,6 +256,17 @@ class Inodes(object): def invalidate_entry(self, inode, name): llfuse.invalidate_entry(inode, name) + def clear(self): + self.inode_cache.clear() + + for k,v in self._entries.items(): + try: + v.finalize() + except Exception as e: + _logger.exception("Error during finalize of inode %i", k) + + self._entries.clear() + def catch_exceptions(orig_func): """Catch uncaught exceptions and log them consistently.""" @@ -301,6 +324,11 @@ class Operations(llfuse.Operations): self.num_retries = num_retries + self.read_counter = arvados.keep.Counter() + self.write_counter = arvados.keep.Counter() + self.read_ops_counter = arvados.keep.Counter() + self.write_ops_counter = arvados.keep.Counter() + self.events = None def init(self): @@ -314,12 +342,7 @@ class Operations(llfuse.Operations): self.events.close() self.events = None - for k,v in self.inodes.items(): - try: - v.finalize() - except Exception as e: - _logger.exception("Error during finalize of inode %i", k) - self.inodes = None + self.inodes.clear() def access(self, inode, mode, ctx): return True @@ -333,20 +356,21 @@ class Operations(llfuse.Operations): def on_event(self, ev): if 'event_type' in ev: with llfuse.lock: - item = self.inodes.inode_cache.find(ev["object_uuid"]) - if item is not None: - item.invalidate() - if ev["object_kind"] == "arvados#collection": - new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"] - - # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which - # should always be the same. - #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None - record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None - - item.update(to_record_version=record_version) - else: - item.update() + items = self.inodes.inode_cache.find(ev["object_uuid"]) + if items is not None: + for item in items: + item.invalidate() + if ev["object_kind"] == "arvados#collection": + new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"] + + # new_attributes.modified_at currently lacks subsecond precision (see #6347) so use event_at which + # should always be the same. + #record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None + record_version = (ev["event_at"], new_attr["portable_data_hash"]) if new_attr else None + + item.update(to_record_version=record_version) + else: + item.update() oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid") olditemparent = self.inodes.inode_cache.find(oldowner) @@ -466,6 +490,8 @@ class Operations(llfuse.Operations): @catch_exceptions def read(self, fh, off, size): _logger.debug("arv-mount read %i %i %i", fh, off, size) + self.read_ops_counter.add(1) + if fh in self._filehandles: handle = self._filehandles[fh] else: @@ -473,11 +499,16 @@ class Operations(llfuse.Operations): self.inodes.touch(handle.obj) - return handle.obj.readfrom(off, size, self.num_retries) + r = handle.obj.readfrom(off, size, self.num_retries) + if r: + self.read_counter.add(len(r)) + return r @catch_exceptions def write(self, fh, off, buf): _logger.debug("arv-mount write %i %i %i", fh, off, len(buf)) + self.write_ops_counter.add(1) + if fh in self._filehandles: handle = self._filehandles[fh] else: @@ -488,7 +519,10 @@ class Operations(llfuse.Operations): self.inodes.touch(handle.obj) - return handle.obj.writeto(off, buf, self.num_retries) + w = handle.obj.writeto(off, buf, self.num_retries) + if w: + self.write_counter.add(w) + return w @catch_exceptions def release(self, fh):