Improved fresh/stale handling with base class, added property fuse inode cache
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 6 May 2014 15:25:34 +0000 (11:25 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 6 May 2014 15:25:34 +0000 (11:25 -0400)
invalidation.  Added common merge function for handling directory updates.
arv-mount now daemonizes by default.

sdk/python/arvados/fuse/__init__.py
sdk/python/bin/arv-mount
sdk/python/requirements.txt
sdk/python/setup_fuse.py.src
sdk/python/test_mount.py

index da1582cd044163549e516cdc39d83c8a9e4433ba..15fbbc93cabd45ef46fe6b3031fa2e791d5ab17d 100644 (file)
@@ -13,21 +13,49 @@ import arvados
 import pprint
 import arvados.events
 import re
+import apiclient
+import json
 
 from time import time
 from llfuse import FUSEError
 
-class File(object):
-    '''Wraps a StreamFileReader for use by Directory.'''
+class FreshBase(object):
+    '''Base class for maintaining fresh/stale state to determine when to update.'''
+    def __init__(self):
+        self._stale = True
+        self._poll = False
+        self._last_update = time()
+        self._poll_time = 60
+
+    # Mark the value as stale
+    def invalidate(self):
+        self._stale = True
+
+    # Test if the entries dict is stale
+    def stale(self):
+        if self._stale:
+            return True
+        if self._poll:
+            return (self._last_update + self._poll_time) < time()
+        return False
+
+    def fresh(self):
+        self._stale = False
+        self._last_update = time()
+
+
+class File(FreshBase):
+    '''Base for file objects.'''
 
     def __init__(self, parent_inode):
+        super(File, self).__init__()
         self.inode = None
         self.parent_inode = parent_inode
 
     def size(self):
         return 0
 
-    def readfrom(self, off, size)):
+    def readfrom(self, off, size):
         return ''
 
 
@@ -35,22 +63,27 @@ class StreamReaderFile(File):
     '''Wraps a StreamFileReader as a file.'''
 
     def __init__(self, parent_inode, reader):
-        super(StreamReaderFile, self).__init__(parent_inode, reader)
+        super(StreamReaderFile, self).__init__(parent_inode)
         self.reader = reader
 
     def size(self):
         return self.reader.size()
 
-    def readfrom(self, off, size)):
+    def readfrom(self, off, size):
         return self.reader.readfrom(off, size)
 
+    def stale(self):
+        return False
+
 
 class ObjectFile(File):
-    '''Wraps a serialized object as a file.'''
+    '''Wraps a dict as a serialized json object.'''
 
     def __init__(self, parent_inode, contents):
-        super(ObjectFile, self).__init__(parent_inode, reader)
-        self.contents = contents
+        super(ObjectFile, self).__init__(parent_inode)
+        self.contentsdict = contents
+        self.uuid = self.contentsdict['uuid']
+        self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
 
     def size(self):
         return len(self.contents)
@@ -59,70 +92,83 @@ class ObjectFile(File):
         return self.contents[off:(off+size)]
 
 
-class Directory(object):
+class Directory(FreshBase):
     '''Generic directory object, backed by a dict.
     Consists of a set of entries with the key representing the filename
     and the value referencing a File or Directory object.
     '''
 
     def __init__(self, parent_inode):
+        super(Directory, self).__init__()
+
         '''parent_inode is the integer inode number'''
         self.inode = None
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
         self._entries = {}
-        self._stale = True
-        self._poll = False
-        self._last_update = time()
-        self._poll_time = 60
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
     def update(self):
         pass
 
-    # Mark the entries dict as stale
-    def invalidate(self):
-        self._stale = True
-
-    # Test if the entries dict is stale
-    def stale(self):
-        if self._stale:
-            return True
-        if self._poll:
-            return (self._last_update + self._poll_time) < time()
-        return False
-
-    def fresh(self):
-        self._stale = False
-        self._last_update = time()
-
     # Only used when computing the size of the disk footprint of the directory
     # (stub)
     def size(self):
         return 0
 
-    def __getitem__(self, item):
+    def checkupdate(self):
         if self.stale():
-            self.update()
+            try:
+                self.update()
+            except apiclient.errors.HttpError as e:
+                print e
+
+    def __getitem__(self, item):
+        self.checkupdate()
         return self._entries[item]
 
     def items(self):
-        if self.stale():
-            self.update()
+        self.checkupdate()
         return self._entries.items()
 
     def __iter__(self):
-        if self.stale():
-            self.update()
+        self.checkupdate()
         return self._entries.iterkeys()
 
     def __contains__(self, k):
-        if self.stale():
-            self.update()
+        self.checkupdate()
         return k in self._entries
 
+    def merge(self, items, fn, same, new_entry):
+        '''Helper method for updating the contents of the directory.
+
+        items: array with new directory contents
+
+        fn: function to take an entry in 'items' and return the desired file or
+        directory name
+
+        same: function to compare an existing entry with an entry in the items
+        list to determine whether to keep the existing entry.
+
+        new_entry: function to create a new directory entry from array entry.
+        '''
+
+        oldentries = self._entries
+        self._entries = {}
+        for i in items:
+            n = fn(i)
+            if n in oldentries and same(oldentries[n], i):
+                self._entries[n] = oldentries[n]
+                del oldentries[n]
+            else:
+                self._entries[n] = self.inodes.add_entry(new_entry(i))
+        for n in oldentries:
+            llfuse.invalidate_entry(self.inode, str(n))
+            self.inodes.del_entry(oldentries[n])
+        self.fresh()
+
 
 class CollectionDirectory(Directory):
     '''Represents the root of a directory tree holding a collection.'''
@@ -132,6 +178,9 @@ class CollectionDirectory(Directory):
         self.inodes = inodes
         self.collection_locator = collection_locator
 
+    def same(self, i):
+        return i['uuid'] == self.collection_locator
+
     def update(self):
         collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
         for s in collection.all_streams():
@@ -199,16 +248,10 @@ class TagsDirectory(Directory):
 
     def update(self):
         tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
-        oldentries = self._entries
-        self._entries = {}
-        for n in tags['items']:
-            n = n['name']
-            if n in oldentries:
-                self._entries[n] = oldentries[n]
-            else:
-                self._entries[n] = self.inodes.add_entry(TagDirectory(self.inode, self.inodes, self.api, n, poll=self._poll, poll_time=self._poll_time))
-        self.fresh()
-
+        self.merge(tags['items'],
+                   lambda i: i['name'],
+                   lambda a, i: a.tag == i,
+                   lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
 
 class TagDirectory(Directory):
     '''A special directory that contains as subdirectories all collections visible
@@ -224,19 +267,14 @@ class TagDirectory(Directory):
         self._poll_time = poll_time
 
     def update(self):
-        collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
+        taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
                                                ['name', '=', self.tag],
                                                ['head_uuid', 'is_a', 'arvados#collection']],
                                       select=['head_uuid']).execute()
-        oldentries = self._entries
-        self._entries = {}
-        for c in collections['items']:
-            n = c['head_uuid']
-            if n in oldentries:
-                self._entries[n] = oldentries[n]
-            else:
-                self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
-        self.fresh()
+        self.merge(taggedcollections['items'],
+                   lambda i: i['head_uuid'],
+                   lambda a, i: a.collection_locator == i['head_uuid'],
+                   lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
 
 
 class GroupsDirectory(Directory):
@@ -260,16 +298,10 @@ class GroupsDirectory(Directory):
 
     def update(self):
         groups = self.api.groups().list().execute()
-        oldentries = self._entries
-        self._entries = {}
-        for n in groups['items']:
-            id = n['name']
-            if id in oldentries and oldentries[id].uuid == n['uuid']:
-                self._entries[id] = oldentries[id]
-            else:
-                self._entries[id] = self.inodes.add_entry(GroupDirectory(self.inode, self.inodes, self.api,
-                                                                         n['uuid'], poll=self._poll, poll_time=self._poll_time))
-        self.fresh()
+        self.merge(groups['items'],
+                   lambda i: i['uuid'],
+                   lambda a, i: a.uuid == i['uuid'],
+                   lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
 
 
 class GroupDirectory(Directory):
@@ -279,7 +311,7 @@ class GroupDirectory(Directory):
         super(GroupDirectory, self).__init__(parent_inode)
         self.inodes = inodes
         self.api = api
-        self.uuid = uuid
+        self.uuid = uuid['uuid']
         self._poll = poll
         self._poll_time = poll_time
 
@@ -289,38 +321,38 @@ class GroupDirectory(Directory):
             for a in self._entries:
                 self._entries[a].invalidate()
 
-    def createDirectory(self, parent_inode, inodes, api, uuid, poll, poll_time):
-        print uuid
+    def createDirectory(self, i):
         if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
-            return CollectionDirectory(parent_inode, inodes, i['uuid'])
-        if re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
-            return ObjectFile(parent_inode, inodes, json.dumps(i))
+            return CollectionDirectory(self.inode, self.inodes, i['uuid'])
+        elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
+            return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
+        elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
+            return ObjectFile(self.parent_inode, i)
         return None
 
     def update(self):
-        contents = self.api.groups().contents(uuid=self.uuid).execute()
+        contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
         links = {}
         for a in contents['links']:
             links[a['head_uuid']] = a['name']
 
-        oldentries = self._entries
-        self._entries = {}
-
-        for i in contents['items']:
+        def choose_name(i):
             if i['uuid'] in links:
-                n = links[i['uuid']]
-            elif 'name' in i and len(i['name']) > 0:
-                n = i['name']
+                return links[i['uuid']]
             else:
-                n = i['uuid']
+                return i['uuid']
 
-            if n in oldentries and oldentries[n].uuid == i['uuid']:
-                self._entries[n] = oldentries[n]
-            else:
-                d = self.createDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
-                if d != None:
-                    self._entries[n] = self.inodes.add_entry(d)
-        self.fresh()
+        def same(a, i):
+            if isinstance(a, CollectionDirectory):
+                return a.collection_locator == i['uuid']
+            elif isinstance(a, ObjectFile):
+                return a.uuid == i['uuid'] and not a.stale()
+            return False
+
+        self.merge(contents['items'],
+                   choose_name,
+                   same,
+                   self.createDirectory)
 
 
 class FileHandle(object):
@@ -361,6 +393,10 @@ class Inodes(object):
         self._counter += 1
         return entry
 
+    def del_entry(self, entry):
+        llfuse.invalidate_inode(entry.inode)
+        del self._entries[entry.inode]
+
 class Operations(llfuse.Operations):
     '''This is the main interface with llfuse.  The methods on this object are
     called by llfuse threads to service FUSE events to query and read from
@@ -394,6 +430,9 @@ class Operations(llfuse.Operations):
         return True
 
     def getattr(self, inode):
+        if inode not in self.inodes:
+            raise llfuse.FUSEError(errno.ENOENT)
+
         e = self.inodes[inode]
 
         entry = llfuse.EntryAttributes()
@@ -511,7 +550,8 @@ class Operations(llfuse.Operations):
 
         e = off
         while e < len(handle.entry):
-            yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
+            if handle.entry[e][1].inode in self.inodes:
+                yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
             e += 1
 
     def releasedir(self, fh):
index 849af3b58649e59cf8a12a26f4bc34af01e75d1a..fc5491ff68ccb68c16745ee9032c41d96fe30c4f 100755 (executable)
@@ -4,6 +4,7 @@ from arvados.fuse import *
 import arvados
 import subprocess
 import argparse
+import daemon
 
 if __name__ == '__main__':
     # Handle command line parameters
@@ -24,6 +25,7 @@ with "--".
 collections on the server.""")
     parser.add_argument('--groups', action='store_true', help="""Mount as a virtual directory consisting of subdirectories representing groups on the server.""")
     parser.add_argument('--debug', action='store_true', help="""Debug mode""")
+    parser.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
     parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                         dest="exec_args", metavar=('command', 'args', '...', '--'),
                         help="""Mount, run a command, then unmount and exit""")
@@ -53,10 +55,10 @@ collections on the server.""")
     if args.debug:
         opts += ['debug']
 
-    # Initialize the fuse connection
-    llfuse.init(operations, args.mountpoint, opts)
-
     if args.exec_args:
+        # Initialize the fuse connection
+        llfuse.init(operations, args.mountpoint, opts)
+
         t = threading.Thread(None, lambda: llfuse.main())
         t.start()
 
@@ -76,4 +78,12 @@ collections on the server.""")
 
         exit(rc)
     else:
-        llfuse.main()
+        if args.foreground:
+            # Initialize the fuse connection
+            llfuse.init(operations, args.mountpoint, opts)
+            llfuse.main()
+        else:
+            with daemon.DaemonContext():
+                # Initialize the fuse connection
+                llfuse.init(operations, args.mountpoint, opts)
+                llfuse.main()
index 1a8219b91fa6d8bee89ddf86e91b195f7ebae0f5..a6a7591c8d11c8cdb77c5dffe9a34de55daa8355 100644 (file)
@@ -5,3 +5,4 @@ urllib3==1.7.1
 llfuse==0.40
 ws4py==0.3.4
 PyYAML==3.11
+python-daemon==1.6
index 9e191fbb2416cbeff07f6417f563209e2e427d74..9628712fdc9ec4918eaa3be83b3e5d4daec9d6a6 100644 (file)
@@ -22,6 +22,7 @@ setup(name='arvados-fuse-driver',
         ],
       install_requires=[
         'arvados-python-client',
-       'llfuse'
+       'llfuse',
+        'python-daemon'
         ],
       zip_safe=False)
index 9251f6976319ad0cba68262037c4b7ba5acebea9..06252af9e6af09ea2a4ff95ab4046aac89549720 100644 (file)
@@ -236,7 +236,7 @@ class FuseTagsUpdateTestBase(MountTestBase):
         d3.sort()
         self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d3)
 
-        api.links().create(body={'link': {
+        l = api.links().create(body={'link': {
             'head_uuid': 'ea10d51bcf88862dbcc36eb292017dfd+45',
             'link_class': 'tag',
             'name': 'bar_tag'
@@ -248,6 +248,14 @@ class FuseTagsUpdateTestBase(MountTestBase):
         d4.sort()
         self.assertEqual(['ea10d51bcf88862dbcc36eb292017dfd+45', 'fa7aeb5140e2848d39b416daeef4ffc5+45'], d4)
 
+        api.links().delete(uuid=l['uuid']).execute()
+
+        time.sleep(1)
+
+        d5 = os.listdir(os.path.join(self.mounttmp, 'bar_tag'))
+        d5.sort()
+        self.assertEqual(['fa7aeb5140e2848d39b416daeef4ffc5+45'], d5)
+
 
 class FuseTagsUpdateTestWebsockets(FuseTagsUpdateTestBase):
     def setUp(self):