3198: Manage inode cache based on (approximate) object size instead of object
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Apr 2015 19:10:32 +0000 (15:10 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Apr 2015 19:10:32 +0000 (15:10 -0400)
count.  It's only a soft limit on memory usage but still a big improvement.

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/stream.py
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py
services/fuse/bin/arv-mount

index 3129bdf2ff0c890337e4594d9fcf9605745d4b04..ce342b5a41a887ec90cf6a4a979af06cd669e5e7 100644 (file)
@@ -703,7 +703,7 @@ class ArvadosFile(object):
                     # segment is past the trucate size, all done
                     break
                 elif size < range_end:
-                    nr = Range(r.locator, r.range_start, size - r.range_start)
+                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
                     nr.segment_offset = r.segment_offset
                     new_segs.append(nr)
                     break
@@ -815,7 +815,7 @@ class ArvadosFile(object):
         """Internal implementation of add_segment."""
         self._modified = True
         for lr in locators_and_ranges(blocks, pos, size):
-            last = self._segments[-1] if self._segments else Range(0, 0, 0)
+            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
             r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
             self._segments.append(r)
 
index 3d48652dd53afe4eecc3bc35628646e427a8ac73..f03deedb18aece57b374a2e94c0e29d26b42319c 100644 (file)
@@ -1381,7 +1381,7 @@ class Collection(RichCollectionBase):
                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
                 if block_locator:
                     blocksize = long(block_locator.group(1))
-                    blocks.append(Range(tok, streamoffset, blocksize))
+                    blocks.append(Range(tok, streamoffset, blocksize, 0))
                     streamoffset += blocksize
                 else:
                     state = SEGMENTS
index 3a42aa010112a7e52d36319ff15558d7ff6298f0..afc202e1806cd9c5ce32ce4aa8e09777d527be0b 100644 (file)
@@ -35,7 +35,7 @@ class StreamReader(object):
             s = re.match(r'^[0-9a-f]{32}\+(\d+)(\+\S+)*$', tok)
             if s:
                 blocksize = long(s.group(1))
-                self._data_locators.append(Range(tok, streamoffset, blocksize))
+                self._data_locators.append(Range(tok, streamoffset, blocksize, 0))
                 streamoffset += blocksize
                 continue
 
@@ -45,7 +45,7 @@ class StreamReader(object):
                 size = long(s.group(2))
                 name = s.group(3).replace('\\040', ' ')
                 if name not in self._files:
-                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size)], name)
+                    self._files[name] = StreamFileReader(self, [Range(pos, 0, size, 0)], name)
                 else:
                     filereader = self._files[name]
                     filereader.segments.append(Range(pos, filereader.size(), size))
index 5bb21c60ea69124eb721dcfc1b8b9ed3b05dcf9a..83b4710da818571280eb6544f3fb462f4d8ce787 100644 (file)
@@ -61,40 +61,52 @@ class InodeCache(object):
         self._entries = collections.OrderedDict()
         self._counter = itertools.count(1)
         self.cap = cap
+        self._total = 0
+
+    def _remove(self, obj, clear):
+        if clear and not obj.clear():
+            _logger.warn("Could not clear %s in_use %s", obj, obj.in_use())
+            return False
+        self._total -= obj._cache_size
+        del self._entries[obj._cache_priority]
+        _logger.warn("Cleared %s total now %i", obj, self._total)
+        return True
 
     def cap_cache(self):
-        if len(self._entries) > self.cap:
-            ent = iter(self._entries)
-            ents = [next(ent) for i in xrange(0, len(self._entries) - self.cap)]
-            for key in ents:
-                capobj = self._entries[key]
-                if capobj.clear():
-                    _logger.debug("Cleared %s", self._entries[key])
-                    del self._entries[key]
+        _logger.warn("total is %i cap is %i", self._total, self.cap)
+        if self._total > self.cap:
+            need_gc = False
+            for key in list(self._entries.keys()):
+                if self._total < self.cap or len(self._entries) < 4:
+                    break
+                self._remove(self._entries[key], True)
+
 
     def manage(self, obj):
-        obj._cache_priority = next(self._counter)
-        self._entries[obj._cache_priority] = obj
-        _logger.debug("Managing %s", obj)
-        self.cap_cache()
+        if obj.persisted():
+            obj._cache_priority = next(self._counter)
+            obj._cache_size = obj.objsize()
+            self._entries[obj._cache_priority] = obj
+            self._total += obj.objsize()
+            _logger.warn("Managing %s total now %i", obj, self._total)
+            self.cap_cache()
 
     def touch(self, obj):
-        if obj._cache_priority in self._entries:
-            del self._entries[obj._cache_priority]
-        self.manage(obj)
+        if obj.persisted():
+            if obj._cache_priority in self._entries:
+                self._remove(obj, False)
+            self.manage(obj)
+            _logger.warn("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
 
     def unmanage(self, obj):
-        if obj._cache_priority in self._entries:
-            if obj.clear():
-                _logger.debug("Cleared %s", obj)
-                del self._entries[obj._cache_priority]
-
+        if obj.persisted() and obj._cache_priority in self._entries:
+            self._remove(obj, True)
 
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
 
-    def __init__(self, inode_cache=1000):
+    def __init__(self, inode_cache=256*1024*1024):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
         self._obj_cache = InodeCache(cap=inode_cache)
index 8bb810b65bc9dfa776c2ffe64cb3a9e40966965a..9da3a5c912042987cdd6a5f510ff0ebbbd37dabd 100644 (file)
@@ -1,6 +1,7 @@
 import time
 import ciso8601
 import calendar
+import functools
 
 def convertTime(t):
     """Parse Arvados timestamp to unix time."""
@@ -11,6 +12,16 @@ def convertTime(t):
     except (TypeError, ValueError):
         return 0
 
+def use_counter(orig_func):
+    @functools.wraps(orig_func)
+    def use_counter_wrapper(self, *args, **kwargs):
+        try:
+            self.inc_use()
+            return orig_func(self, *args, **kwargs)
+        finally:
+            self.dec_use()
+    return use_counter_wrapper
+
 class FreshBase(object):
     """Base class for maintaining fresh/stale state to determine when to update."""
     def __init__(self):
@@ -19,6 +30,7 @@ class FreshBase(object):
         self._last_update = time.time()
         self._atime = time.time()
         self._poll_time = 60
+        self.use_count = 0
 
     # Mark the value as stale
     def invalidate(self):
@@ -38,3 +50,21 @@ class FreshBase(object):
 
     def atime(self):
         return self._atime
+
+    def persisted(self):
+        return False
+
+    def clear(self, force=False):
+        pass
+
+    def in_use(self):
+        return self.use_count > 0
+
+    def inc_use(self):
+        self.use_count += 1
+
+    def dec_use(self):
+        self.use_count -= 1
+
+    def objsize(self):
+        return 0
index cfa81b34b7a3d78e0b260f748facbc29f04c9897..08a5168a7c7409afab5c988e9a5e18882c0bef98 100644 (file)
@@ -7,7 +7,7 @@ import apiclient
 import functools
 
 from fusefile import StringFile, StreamReaderFile, ObjectFile
-from fresh import FreshBase, convertTime
+from fresh import FreshBase, convertTime, use_counter
 
 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
 
@@ -32,16 +32,6 @@ def sanitize_filename(dirty):
     else:
         return _disallowed_filename_characters.sub('_', dirty)
 
-def use_counter(orig_func):
-    @functools.wraps(orig_func)
-    def use_counter_wrapper(self, *args, **kwargs):
-        try:
-            self.inc_use()
-            return orig_func(self, *args, **kwargs)
-        finally:
-            self.dec_use()
-    return use_counter_wrapper
-
 
 class Directory(FreshBase):
     """Generic directory object, backed by a dict.
@@ -61,7 +51,6 @@ class Directory(FreshBase):
         self.inodes = inodes
         self._entries = {}
         self._mtime = time.time()
-        self.use_count = 0
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
@@ -74,14 +63,8 @@ class Directory(FreshBase):
     def size(self):
         return 0
 
-    def in_use(self):
-        return self.use_count > 0
-
-    def inc_use(self):
-        self.use_count += 1
-
-    def dec_use(self):
-        self.use_count -= 1
+    def persisted(self):
+        return False
 
     def checkupdate(self):
         if self.stale():
@@ -165,14 +148,12 @@ class Directory(FreshBase):
             oldentries = self._entries
             self._entries = {}
             for n in oldentries:
-                if isinstance(n, Directory):
-                    if not n.clear(force):
-                        self._entries = oldentries
-                        return False
+                if not oldentries[n].clear(force):
+                    self._entries = oldentries
+                    return False
             for n in oldentries:
-                if isinstance(n, Directory):
-                    llfuse.invalidate_entry(self.inode, str(n))
-                    self.inodes.del_entry(oldentries[n])
+                llfuse.invalidate_entry(self.inode, str(n))
+                self.inodes.del_entry(oldentries[n])
             llfuse.invalidate_inode(self.inode)
             self.invalidate()
             return True
@@ -198,6 +179,7 @@ class CollectionDirectory(Directory):
         else:
             self.collection_locator = collection
             self._mtime = 0
+        self._manifest_size = 0
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
@@ -214,6 +196,8 @@ class CollectionDirectory(Directory):
         self.update()
 
     def new_collection(self, new_collection_object, coll_reader):
+        self.clear(force=True)
+
         self.collection_object = new_collection_object
 
         self._mtime = convertTime(self.collection_object.get('modified_at'))
@@ -221,7 +205,6 @@ class CollectionDirectory(Directory):
         if self.collection_object_file is not None:
             self.collection_object_file.update(self.collection_object)
 
-        self.clear(force=True)
         for s in coll_reader.all_streams():
             cwd = self
             for part in s.name().split('/'):
@@ -229,9 +212,6 @@ class CollectionDirectory(Directory):
                     partname = sanitize_filename(part)
                     if partname not in cwd._entries:
                         cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
-                        # (hack until using new API)
-                        cwd._entries[partname].inc_use()
-                        # end hack
                     cwd = cwd._entries[partname]
             for k, v in s.files().items():
                 cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
@@ -264,6 +244,9 @@ class CollectionDirectory(Directory):
             if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
                 self.new_collection(new_collection_object, coll_reader)
 
+            self._manifest_size = len(coll_reader.manifest_text())
+            _logger.debug("%s manifest_size %i", self, self._manifest_size)
+
             self.fresh()
             return True
         except arvados.errors.NotFoundError:
@@ -295,15 +278,15 @@ class CollectionDirectory(Directory):
             return super(CollectionDirectory, self).__contains__(k)
 
     def invalidate(self):
-        super(CollectionDirectory, self).invalidate()
         self.collection_object = None
+        self.collection_object_file = None
+        super(CollectionDirectory, self).invalidate()
 
-    def clear(self, force=False):
-        if self.collection_locator is None:
-            return False
-        else:
-            return super(CollectionDirectory, self).clear(force)
+    def persisted(self):
+        return (self.collection_locator is not None)
 
+    def objsize(self):
+        return self._manifest_size * 128
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
@@ -525,6 +508,11 @@ class ProjectDirectory(Directory):
         else:
             return super(ProjectDirectory, self).__contains__(k)
 
+    def persisted(self):
+        return False
+
+    def objsize(self):
+        return len(self.project_object) * 1024 if self.project_object else 0
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""
index e122d9d2edc2118143fc7e78310dc69cb4fff8ce..efe31c387c09432f3905144c19b5417161d6a1a7 100644 (file)
@@ -24,14 +24,9 @@ class File(FreshBase):
     def mtime(self):
         return self._mtime
 
-    def clear(self):
-        pass
+    def clear(self, force=False):
+        return True
 
-    def inc_use(self):
-        pass
-
-    def dec_use(self):
-        pass
 
 class StreamReaderFile(File):
     """Wraps a StreamFileReader as a file."""
index 9c0ef10e21128a23bcdafd9d9de51b5e49b5198b..3c96a5636c8f558f01254ef44a69a7a502ca14fd 100755 (executable)
@@ -46,7 +46,7 @@ with "--".
     parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
     parser.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
 
-    parser.add_argument('--inode-cache', type=int, help="Inode cache size", default=1024)
+    parser.add_argument('--inode-cache', type=int, help="Inode cache size (default 128MiB)", default=128*1024*1024)
 
     parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                         dest="exec_args", metavar=('command', 'args', '...', '--'),