Merge branch '3198-inode-cache' into 3198-writable-fuse, fix tests.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 7 May 2015 21:11:51 +0000 (17:11 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 7 May 2015 21:11:51 +0000 (17:11 -0400)
Conflicts:
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/tests/test_mount.py

1  2 
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/setup.py
services/fuse/tests/test_inodes.py
services/fuse/tests/test_mount.py

index b3992ef6960117aec5646e2f6b419226dd0c4c67,2387c699c1e3463bb4846c81471e681664f22728..1b44eafabb86d0d5775dc576b451fe003682600d
@@@ -67,9 -52,13 +67,9 @@@ class DirectoryHandle(Handle)
  
  
  class InodeCache(object):
-     def __init__(self, cap):
+     def __init__(self, cap, min_entries=4):
          self._entries = collections.OrderedDict()
 +        self._by_uuid = {}
          self._counter = itertools.count(1)
          self.cap = cap
          self._total = 0
          if clear and not obj.clear():
              _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
              return False
-         self._total -= obj._cache_size
-         del self._entries[obj._cache_priority]
-         if obj._cache_uuid:
-             del self._by_uuid[obj._cache_uuid]
-             obj._cache_uuid = None
+         self._total -= obj.cache_size
+         del self._entries[obj.cache_priority]
++        if obj.cache_uuid:
++            del self._by_uuid[obj.cache_uuid]
++            obj.cache_uuid = None
          _logger.debug("Cleared %s total now %i", obj, self._total)
          return True
  
  
      def manage(self, obj):
          if obj.persisted():
-             obj._cache_priority = next(self._counter)
-             obj._cache_size = obj.objsize()
-             self._entries[obj._cache_priority] = obj
-             if obj.uuid():
-                 obj._cache_uuid = obj.uuid()
-                 self._by_uuid[obj._cache_uuid] = obj
+             obj.cache_priority = next(self._counter)
+             obj.cache_size = obj.objsize()
+             self._entries[obj.cache_priority] = obj
++            obj.cache_uuid = obj.uuid()
++            if obj.cache_uuid:
++                self._by_uuid[obj.cache_uuid] = obj
              self._total += obj.objsize()
              _logger.debug("Managing %s total now %i", obj, self._total)
              self.cap_cache()
-             obj._cache_priority = None
 +        else:
++            obj.cache_priority = None
  
      def touch(self, obj):
          if obj.persisted():
              _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
  
      def unmanage(self, obj):
-         if obj.persisted() and obj._cache_priority in self._entries:
+         if obj.persisted() and obj.cache_priority in self._entries:
              self._remove(obj, True)
  
 +    def find(self, uuid):
 +        return self._by_uuid.get(uuid)
 +
  class Inodes(object):
      """Manage the set of inodes.  This is the mapping from a numeric id
      to a concrete File or Directory object"""
          return entry
  
      def del_entry(self, entry):
 -        self.inode_cache.unmanage(entry)
 -        llfuse.invalidate_inode(entry.inode)
 -        del self._entries[entry.inode]
 +        if entry.ref_count == 0:
 +            _logger.warn("Deleting inode %i", entry.inode)
-             self.cache.unmanage(entry)
++            self.inode_cache.unmanage(entry)
 +            llfuse.invalidate_inode(entry.inode)
 +            del self._entries[entry.inode]
 +        else:
 +            _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
 +            entry.dead = True
 +
 +def catch_exceptions(orig_func):
 +    @functools.wraps(orig_func)
 +    def catch_exceptions_wrapper(self, *args, **kwargs):
 +        try:
 +            return orig_func(self, *args, **kwargs)
 +        except llfuse.FUSEError:
 +            raise
 +        except EnvironmentError as e:
 +            raise llfuse.FUSEError(e.errno)
 +        except:
 +            _logger.exception("Unhandled exception during FUSE operation")
 +            raise llfuse.FUSEError(errno.EIO)
 +
 +    return catch_exceptions_wrapper
  
  
  class Operations(llfuse.Operations):
  
      """
  
-     def __init__(self, uid, gid, encoding="utf-8", inode_cache=1000, num_retries=7):
 -    def __init__(self, uid, gid, encoding="utf-8", inode_cache=InodeCache(cap=256*1024*1024)):
++    def __init__(self, uid, gid,
++                 encoding="utf-8",
++                 inode_cache=InodeCache(cap=256*1024*1024),
++                 num_retries=4):
          super(Operations, self).__init__()
  
          self.inodes = Inodes(inode_cache)
      def access(self, inode, mode, ctx):
          return True
  
-                 item = self.inodes.cache.find(ev["object_uuid"])
 +    def listen_for_events(self, api_client):
 +        self.event = arvados.events.subscribe(api_client,
 +                                 [["event_type", "in", ["create", "update", "delete"]]],
 +                                 self.on_event)
 +
 +    def on_event(self, ev):
 +        if 'event_type' in ev:
 +            with llfuse.lock:
-                 itemparent = self.inodes.cache.find(ev["object_owner_uuid"])
++                item = self.inodes.inode_cache.find(ev["object_uuid"])
 +                if item:
 +                    item.invalidate()
 +                    item.update()
 +
++                itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
 +                if itemparent:
 +                    itemparent.invalidate()
 +                    itemparent.update()
 +
 +    @catch_exceptions
      def getattr(self, inode):
          if inode not in self.inodes:
              raise llfuse.FUSEError(errno.ENOENT)
          except arvados.errors.NotFoundError as e:
              _logger.warning("Block not found: " + str(e))
              raise llfuse.FUSEError(errno.EIO)
 -        except Exception:
 -            _logger.exception()
 -            raise llfuse.FUSEError(errno.EIO)
  
 +    @catch_exceptions
 +    def write(self, fh, off, buf):
 +        _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
 +        if fh in self._filehandles:
 +            handle = self._filehandles[fh]
 +        else:
 +            raise llfuse.FUSEError(errno.EBADF)
 +
 +        if not handle.obj.writable():
 +            raise llfuse.FUSEError(errno.EPERM)
 +
 +        self.inodes.touch(handle.obj)
 +
 +        with llfuse.lock_released:
 +            return handle.obj.writeto(off, buf, self.num_retries)
 +
 +    @catch_exceptions
      def release(self, fh):
          if fh in self._filehandles:
 +            try:
 +                self._filehandles[fh].flush()
 +            except EnvironmentError as e:
 +                raise llfuse.FUSEError(e.errno)
 +            except Exception:
 +                _logger.exception("Flush error")
              self._filehandles[fh].release()
              del self._filehandles[fh]
-         self.inodes.cache.cap_cache()
+         self.inodes.inode_cache.cap_cache()
  
      def releasedir(self, fh):
          self.release(fh)
index e185fbaa9c1f44769f9bec885692238c55ad1b9e,5acadfdf7a4fb9b41b6f8c32515a8675cc7e9adf..aeb8f737c51ba9e82ceea9b171344720666e64ce
@@@ -31,8 -31,8 +31,11 @@@ class FreshBase(object)
          self._atime = time.time()
          self._poll_time = 60
          self.use_count = 0
 -        self.cache_priority = 0
 +        self.ref_count = 0
 +        self.dead = False
++        self.cache_priority = None
+         self.cache_size = 0
++        self.cache_uuid = None
  
      # Mark the value as stale
      def invalidate(self):
index d77fba48b2b4d513131aecd8808491b4f91b69ee,77e8dde0a538634b201ddfb7ff605216ab36d0ad..898af31d263904d73dcd3583ac0c9f83cf54eb4b
@@@ -276,35 -226,27 +276,35 @@@ class CollectionDirectory(CollectionDir
                  return True
  
              with llfuse.lock_released:
 -                coll_reader = arvados.CollectionReader(
 -                    self.collection_locator, self.api, self.api.keep,
 -                    num_retries=self.num_retries)
 -                new_collection_object = coll_reader.api_response() or {}
 -                # If the Collection only exists in Keep, there will be no API
 -                # response.  Fill in the fields we need.
 -                if 'uuid' not in new_collection_object:
 -                    new_collection_object['uuid'] = self.collection_locator
 -                if "portable_data_hash" not in new_collection_object:
 -                    new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
 -                if 'manifest_text' not in new_collection_object:
 -                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
 -                coll_reader.normalize()
 +                _logger.debug("Updating %s", self.collection_locator)
 +                if self.collection:
 +                    self.collection.update()
 +                else:
 +                    if uuid_pattern.match(self.collection_locator):
 +                        coll_reader = arvados.collection.Collection(
 +                            self.collection_locator, self.api, self.api.keep,
 +                            num_retries=self.num_retries)
 +                    else:
 +                        coll_reader = arvados.collection.CollectionReader(
 +                            self.collection_locator, self.api, self.api.keep,
 +                            num_retries=self.num_retries)
 +                    new_collection_record = coll_reader.api_response() or {}
 +                    # If the Collection only exists in Keep, there will be no API
 +                    # response.  Fill in the fields we need.
 +                    if 'uuid' not in new_collection_record:
 +                        new_collection_record['uuid'] = self.collection_locator
 +                    if "portable_data_hash" not in new_collection_record:
 +                        new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
 +                    if 'manifest_text' not in new_collection_record:
 +                        new_collection_record['manifest_text'] = coll_reader.manifest_text()
-             # end with llfuse.lock_released, re-acquire lock
 +
-             if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record["portable_data_hash"]:
-                 self.new_collection(new_collection_record, coll_reader)
++                    if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
++                        self.new_collection(new_collection_record, coll_reader)
 +
-             self._manifest_size = len(coll_reader.manifest_text())
-             _logger.debug("%s manifest_size %i", self, self._manifest_size)
++                    self._manifest_size = len(coll_reader.manifest_text())
++                    _logger.debug("%s manifest_size %i", self, self._manifest_size)
+             # end with llfuse.lock_released, re-acquire lock
  
 -            if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
 -                self.new_collection(new_collection_object, coll_reader)
 -
 -            self._manifest_size = len(coll_reader.manifest_text())
 -            _logger.debug("%s manifest_size %i", self, self._manifest_size)
 -
              self.fresh()
              return True
          except arvados.errors.NotFoundError:
Simple merge
index 0000000000000000000000000000000000000000,82bb7164a9398b2dbde75d67f9e069f8d41c8bcc..2c5f3dcef89efec4c265753f5895b99a14daca5e
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,87 +1,88 @@@
+ import arvados_fuse
+ import mock
+ import unittest
+ class InodeTests(unittest.TestCase):
+     def test_inodes(self):
+         cache = arvados_fuse.InodeCache(1000, 4)
+         inodes = arvados_fuse.Inodes(cache)
+         # Check that ent1 gets added to inodes
+         ent1 = mock.MagicMock()
+         ent1.return_value.in_use = False
+         ent1.persisted.return_value = True
+         ent1.clear.return_value = True
+         ent1.objsize.return_value = 500
+         inodes.add_entry(ent1)
+         self.assertIn(ent1.inode, inodes)
+         self.assertIs(inodes[ent1.inode], ent1)
+         self.assertEqual(500, cache.total())
+         # ent2 is not persisted, so it doesn't
+         # affect the cache total
+         ent2 = mock.MagicMock()
+         ent2.return_value.in_use = False
+         ent2.persisted.return_value = False
+         ent2.objsize.return_value = 600
+         inodes.add_entry(ent2)
+         self.assertEqual(500, cache.total())
+         # ent3 is persisted, adding it should cause ent1 to get cleared
+         ent3 = mock.MagicMock()
+         ent3.return_value.in_use = False
+         ent3.persisted.return_value = True
+         ent3.objsize.return_value = 600
+         ent3.clear.return_value = True
+         self.assertFalse(ent1.clear.called)
+         inodes.add_entry(ent3)
+         # Won't clear anything because min_entries = 4
+         self.assertEqual(2, len(cache._entries))
+         self.assertFalse(ent1.clear.called)
+         self.assertEqual(1100, cache.total())
+         # Change min_entries
+         cache.min_entries = 1
+         cache.cap_cache()
+         self.assertEqual(600, cache.total())
+         self.assertTrue(ent1.clear.called)
+         # Touching ent1 should cause ent3 to get cleared
+         self.assertFalse(ent3.clear.called)
+         cache.touch(ent1)
+         self.assertTrue(ent3.clear.called)
+         self.assertEqual(500, cache.total())
+         # ent1, ent3 clear return false, can't be cleared
+         ent1.clear.return_value = False
+         ent3.clear.return_value = False
+         ent1.clear.called = False
+         ent3.clear.called = False
+         self.assertFalse(ent1.clear.called)
+         self.assertFalse(ent3.clear.called)
+         cache.touch(ent3)
+         self.assertTrue(ent1.clear.called)
+         self.assertTrue(ent3.clear.called)
+         self.assertEqual(1100, cache.total())
+         # ent1 clear return false, so ent3
+         # gets cleared
+         ent1.clear.return_value = False
+         ent3.clear.return_value = True
+         ent1.clear.called = False
+         ent3.clear.called = False
+         self.assertFalse(ent1.clear.called)
+         self.assertFalse(ent3.clear.called)
+         cache.touch(ent3)
+         self.assertTrue(ent1.clear.called)
+         self.assertTrue(ent3.clear.called)
+         self.assertEqual(500, cache.total())
+         # Delete ent1
+         ent1.clear.return_value = True
++        ent1.ref_count = 0
+         inodes.del_entry(ent1)
+         self.assertEqual(0, cache.total())
+         cache.touch(ent3)
+         self.assertEqual(600, cache.total())
index 9a17957767c60edff25db3a2ede2d4c3e7ea4e5f,5535494a67b77089d233544e2c2c1e219eedc622..c7d63873cdd7ae3ddb851d8afc7f053d13e0086a
@@@ -28,14 -25,13 +28,14 @@@ class MountTestBase(unittest.TestCase)
          self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
  
      def make_mount(self, root_class, **root_kwargs):
-         self.operations = fuse.Operations(os.getuid(), os.getgid(), inode_cache=2)
 -        operations = fuse.Operations(os.getuid(), os.getgid())
 -        operations.inodes.add_entry(root_class(
 -            llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
 -        llfuse.init(operations, self.mounttmp, [])
++        self.operations = fuse.Operations(os.getuid(), os.getgid())
 +        self.operations.inodes.add_entry(root_class(
 +            llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
-         llfuse.init(self.operations, self.mounttmp, ['debug'])
++        llfuse.init(self.operations, self.mounttmp, [])
          threading.Thread(None, llfuse.main).start()
          # wait until the driver is finished initializing
 -        operations.initlock.wait()
 +        self.operations.initlock.wait()
 +        return self.operations.inodes[llfuse.ROOT_INODE]
  
      def tearDown(self):
          # llfuse.close is buggy, so use fusermount instead.