From 416a7c3a1b96abf7982362682048481f2afda0c9 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 14 Apr 2015 15:10:32 -0400 Subject: [PATCH] 3198: Manage inode cache based on (approximate) object size instead of object count. It's only a soft limit on memory usage but still a big improvement. --- sdk/python/arvados/arvfile.py | 4 +- sdk/python/arvados/collection.py | 2 +- sdk/python/arvados/stream.py | 4 +- services/fuse/arvados_fuse/__init__.py | 54 +++++++++++++--------- services/fuse/arvados_fuse/fresh.py | 30 +++++++++++++ services/fuse/arvados_fuse/fusedir.py | 62 +++++++++++--------------- services/fuse/arvados_fuse/fusefile.py | 9 +--- services/fuse/bin/arv-mount | 2 +- 8 files changed, 96 insertions(+), 71 deletions(-) diff --git a/sdk/python/arvados/arvfile.py b/sdk/python/arvados/arvfile.py index 3129bdf2ff..ce342b5a41 100644 --- a/sdk/python/arvados/arvfile.py +++ b/sdk/python/arvados/arvfile.py @@ -703,7 +703,7 @@ class ArvadosFile(object): # segment is past the trucate size, all done break elif size < range_end: - nr = Range(r.locator, r.range_start, size - r.range_start) + nr = Range(r.locator, r.range_start, size - r.range_start, 0) nr.segment_offset = r.segment_offset new_segs.append(nr) break @@ -815,7 +815,7 @@ class ArvadosFile(object): """Internal implementation of add_segment.""" self._modified = True for lr in locators_and_ranges(blocks, pos, size): - last = self._segments[-1] if self._segments else Range(0, 0, 0) + last = self._segments[-1] if self._segments else Range(0, 0, 0, 0) r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset) self._segments.append(r) diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 3d48652dd5..f03deedb18 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -1381,7 +1381,7 @@ class Collection(RichCollectionBase): block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok) if block_locator: blocksize = long(block_locator.group(1)) - blocks.append(Range(tok, streamoffset, blocksize)) + blocks.append(Range(tok, streamoffset, blocksize, 0)) streamoffset += blocksize else: state = SEGMENTS diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py index 3a42aa0101..afc202e180 100644 --- a/sdk/python/arvados/stream.py +++ b/sdk/python/arvados/stream.py @@ -35,7 +35,7 @@ class StreamReader(object): s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok) if s: blocksize = long(s.group(1)) - self._data_locators.append(Range(tok, streamoffset, blocksize)) + self._data_locators.append(Range(tok, streamoffset, blocksize, 0)) streamoffset += blocksize continue @@ -45,7 +45,7 @@ class StreamReader(object): size = long(s.group(2)) name = s.group(3).replace('\\040', ' ') if name not in self._files: - self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name) + self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name) else: filereader = self._files[name] filereader.segments.append(Range(pos, filereader.size(), size)) diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index 5bb21c60ea..83b4710da8 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -61,40 +61,52 @@ class InodeCache(object): self._entries = collections.OrderedDict() self._counter = itertools.count(1) self.cap = cap + self._total = 0 + + def _remove(self, obj, clear): + if clear and not obj.clear(): + _logger.warn("Could not clear %s in_use %s", obj, obj.in_use()) + return False + self._total -= obj._cache_size + del self._entries[obj._cache_priority] + _logger.warn("Cleared %s total now %i", obj, self._total) + return True def cap_cache(self): - if len(self._entries) > self.cap: - ent = iter(self._entries) - ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)] - for key in ents: - capobj = self._entries[key] - if capobj.clear(): - _logger.debug("Cleared %s", self._entries[key]) - del self._entries[key] + _logger.warn("total is %i cap is %i", self._total, self.cap) + if self._total > self.cap: + need_gc = False + for key in list(self._entries.keys()): + if self._total < self.cap or len(self._entries) < 4: + break + self._remove(self._entries[key], True) + def manage(self, obj): - obj._cache_priority = next(self._counter) - self._entries[obj._cache_priority] = obj - _logger.debug("Managing %s", obj) - self.cap_cache() + if obj.persisted(): + obj._cache_priority = next(self._counter) + obj._cache_size = obj.objsize() + self._entries[obj._cache_priority] = obj + self._total += obj.objsize() + _logger.warn("Managing %s total now %i", obj, self._total) + self.cap_cache() def touch(self, obj): - if obj._cache_priority in self._entries: - del self._entries[obj._cache_priority] - self.manage(obj) + if obj.persisted(): + if obj._cache_priority in self._entries: + self._remove(obj, False) + self.manage(obj) + _logger.warn("Touched %s (%i) total now %i", obj, obj.objsize(), self._total) def unmanage(self, obj): - if obj._cache_priority in self._entries: - if obj.clear(): - _logger.debug("Cleared %s", obj) - del self._entries[obj._cache_priority] - + if obj.persisted() and obj._cache_priority in self._entries: + self._remove(obj, True) class Inodes(object): """Manage the set of inodes. This is the mapping from a numeric id to a concrete File or Directory object""" - def __init__(self, inode_cache=1000): + def __init__(self, inode_cache=256*1024*1024): self._entries = {} self._counter = itertools.count(llfuse.ROOT_INODE) self._obj_cache = InodeCache(cap=inode_cache) diff --git a/services/fuse/arvados_fuse/fresh.py b/services/fuse/arvados_fuse/fresh.py index 8bb810b65b..9da3a5c912 100644 --- a/services/fuse/arvados_fuse/fresh.py +++ b/services/fuse/arvados_fuse/fresh.py @@ -1,6 +1,7 @@ import time import ciso8601 import calendar +import functools def convertTime(t): """Parse Arvados timestamp to unix time.""" @@ -11,6 +12,16 @@ def convertTime(t): except (TypeError, ValueError): return 0 +def use_counter(orig_func): + @functools.wraps(orig_func) + def use_counter_wrapper(self, *args, **kwargs): + try: + self.inc_use() + return orig_func(self, *args, **kwargs) + finally: + self.dec_use() + return use_counter_wrapper + class FreshBase(object): """Base class for maintaining fresh/stale state to determine when to update.""" def __init__(self): @@ -19,6 +30,7 @@ class FreshBase(object): self._last_update = time.time() self._atime = time.time() self._poll_time = 60 + self.use_count = 0 # Mark the value as stale def invalidate(self): @@ -38,3 +50,21 @@ class FreshBase(object): def atime(self): return self._atime + + def persisted(self): + return False + + def clear(self, force=False): + pass + + def in_use(self): + return self.use_count > 0 + + def inc_use(self): + self.use_count += 1 + + def dec_use(self): + self.use_count -= 1 + + def objsize(self): + return 0 diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index cfa81b34b7..08a5168a7c 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -7,7 +7,7 @@ import apiclient import functools from fusefile import StringFile, StreamReaderFile, ObjectFile -from fresh import FreshBase, convertTime +from fresh import FreshBase, convertTime, use_counter from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern @@ -32,16 +32,6 @@ def sanitize_filename(dirty): else: return _disallowed_filename_characters.sub('_', dirty) -def use_counter(orig_func): - @functools.wraps(orig_func) - def use_counter_wrapper(self, *args, **kwargs): - try: - self.inc_use() - return orig_func(self, *args, **kwargs) - finally: - self.dec_use() - return use_counter_wrapper - class Directory(FreshBase): """Generic directory object, backed by a dict. @@ -61,7 +51,6 @@ class Directory(FreshBase): self.inodes = inodes self._entries = {} self._mtime = time.time() - self.use_count = 0 # Overriden by subclasses to implement logic to update the entries dict # when the directory is stale @@ -74,14 +63,8 @@ class Directory(FreshBase): def size(self): return 0 - def in_use(self): - return self.use_count > 0 - - def inc_use(self): - self.use_count += 1 - - def dec_use(self): - self.use_count -= 1 + def persisted(self): + return False def checkupdate(self): if self.stale(): @@ -165,14 +148,12 @@ class Directory(FreshBase): oldentries = self._entries self._entries = {} for n in oldentries: - if isinstance(n, Directory): - if not n.clear(force): - self._entries = oldentries - return False + if not oldentries[n].clear(force): + self._entries = oldentries + return False for n in oldentries: - if isinstance(n, Directory): - llfuse.invalidate_entry(self.inode, str(n)) - self.inodes.del_entry(oldentries[n]) + llfuse.invalidate_entry(self.inode, str(n)) + self.inodes.del_entry(oldentries[n]) llfuse.invalidate_inode(self.inode) self.invalidate() return True @@ -198,6 +179,7 @@ class CollectionDirectory(Directory): else: self.collection_locator = collection self._mtime = 0 + self._manifest_size = 0 def same(self, i): return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator @@ -214,6 +196,8 @@ class CollectionDirectory(Directory): self.update() def new_collection(self, new_collection_object, coll_reader): + self.clear(force=True) + self.collection_object = new_collection_object self._mtime = convertTime(self.collection_object.get('modified_at')) @@ -221,7 +205,6 @@ class CollectionDirectory(Directory): if self.collection_object_file is not None: self.collection_object_file.update(self.collection_object) - self.clear(force=True) for s in coll_reader.all_streams(): cwd = self for part in s.name().split('/'): @@ -229,9 +212,6 @@ class CollectionDirectory(Directory): partname = sanitize_filename(part) if partname not in cwd._entries: cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes)) - # (hack until using new API) - cwd._entries[partname].inc_use() - # end hack cwd = cwd._entries[partname] for k, v in s.files().items(): cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime())) @@ -264,6 +244,9 @@ class CollectionDirectory(Directory): if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]: self.new_collection(new_collection_object, coll_reader) + self._manifest_size = len(coll_reader.manifest_text()) + _logger.debug("%s manifest_size %i", self, self._manifest_size) + self.fresh() return True except arvados.errors.NotFoundError: @@ -295,15 +278,15 @@ class CollectionDirectory(Directory): return super(CollectionDirectory, self).__contains__(k) def invalidate(self): - super(CollectionDirectory, self).invalidate() self.collection_object = None + self.collection_object_file = None + super(CollectionDirectory, self).invalidate() - def clear(self, force=False): - if self.collection_locator is None: - return False - else: - return super(CollectionDirectory, self).clear(force) + def persisted(self): + return (self.collection_locator is not None) + def objsize(self): + return self._manifest_size * 128 class MagicDirectory(Directory): """A special directory that logically contains the set of all extant keep locators. @@ -525,6 +508,11 @@ class ProjectDirectory(Directory): else: return super(ProjectDirectory, self).__contains__(k) + def persisted(self): + return False + + def objsize(self): + return len(self.project_object) * 1024 if self.project_object else 0 class SharedDirectory(Directory): """A special directory that represents users or groups who have shared projects with me.""" diff --git a/services/fuse/arvados_fuse/fusefile.py b/services/fuse/arvados_fuse/fusefile.py index e122d9d2ed..efe31c387c 100644 --- a/services/fuse/arvados_fuse/fusefile.py +++ b/services/fuse/arvados_fuse/fusefile.py @@ -24,14 +24,9 @@ class File(FreshBase): def mtime(self): return self._mtime - def clear(self): - pass + def clear(self, force=False): + return True - def inc_use(self): - pass - - def dec_use(self): - pass class StreamReaderFile(File): """Wraps a StreamFileReader as a file.""" diff --git a/services/fuse/bin/arv-mount b/services/fuse/bin/arv-mount index 9c0ef10e21..3c96a5636c 100755 --- a/services/fuse/bin/arv-mount +++ b/services/fuse/bin/arv-mount @@ -46,7 +46,7 @@ with "--". parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False) parser.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8") - parser.add_argument('--inode-cache', type=int, help="Inode cache size", default=1024) + parser.add_argument('--inode-cache', type=int, help="Inode cache size (default 128MiB)", default=128*1024*1024) parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER, dest="exec_args", metavar=('command', 'args', '...', '--'), -- 2.39.5