Adding support for groups and viewing arvados objects as files (serialized
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 6 May 2014 02:42:55 +0000 (22:42 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 6 May 2014 02:42:55 +0000 (22:42 -0400)
json).  Work in progress.

sdk/python/arvados/fuse/__init__.py
sdk/python/bin/arv-mount

index cc35d584bd5332e2cda93bca76384ddbd35fd3d6..da1582cd044163549e516cdc39d83c8a9e4433ba 100644 (file)
@@ -12,10 +12,53 @@ import threading
 import arvados
 import pprint
 import arvados.events
+import re
 
 from time import time
 from llfuse import FUSEError
 
+class File(object):
+    '''Wraps a StreamFileReader for use by Directory.'''
+
+    def __init__(self, parent_inode):
+        self.inode = None
+        self.parent_inode = parent_inode
+
+    def size(self):
+        return 0
+
+    def readfrom(self, off, size)):
+        return ''
+
+
+class StreamReaderFile(File):
+    '''Wraps a StreamFileReader as a file.'''
+
+    def __init__(self, parent_inode, reader):
+        super(StreamReaderFile, self).__init__(parent_inode, reader)
+        self.reader = reader
+
+    def size(self):
+        return self.reader.size()
+
+    def readfrom(self, off, size)):
+        return self.reader.readfrom(off, size)
+
+
+class ObjectFile(File):
+    '''Wraps a serialized object as a file.'''
+
+    def __init__(self, parent_inode, contents):
+        super(ObjectFile, self).__init__(parent_inode, reader)
+        self.contents = contents
+
+    def size(self):
+        return len(self.contents)
+
+    def readfrom(self, off, size):
+        return self.contents[off:(off+size)]
+
+
 class Directory(object):
     '''Generic directory object, backed by a dict.
     Consists of a set of entries with the key representing the filename
@@ -99,9 +142,10 @@ class CollectionDirectory(Directory):
                         cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
                     cwd = cwd._entries[part]
             for k, v in s.files().items():
-                cwd._entries[k] = self.inodes.add_entry(File(cwd.inode, v))
+                cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
         self.fresh()
 
+
 class MagicDirectory(Directory):
     '''A special directory that logically contains the set of all extant keep
     locators.  When a file is referenced by lookup(), it is tested to see if it
@@ -154,7 +198,7 @@ class TagsDirectory(Directory):
                 self._entries[a].invalidate()
 
     def update(self):
-        tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = 'name').execute()
+        tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
         oldentries = self._entries
         self._entries = {}
         for n in tags['items']:
@@ -194,16 +238,89 @@ class TagDirectory(Directory):
                 self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
         self.fresh()
 
-class File(object):
-    '''Wraps a StreamFileReader for use by Directory.'''
 
-    def __init__(self, parent_inode, reader):
-        self.inode = None
-        self.parent_inode = parent_inode
-        self.reader = reader
+class GroupsDirectory(Directory):
+    '''A special directory that contains as subdirectories all groups visible to the user.'''
 
-    def size(self):
-        return self.reader.size()
+    def __init__(self, parent_inode, inodes, api, poll_time=60):
+        super(GroupsDirectory, self).__init__(parent_inode)
+        self.inodes = inodes
+        self.api = api
+        try:
+            arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
+        except:
+            self._poll = True
+            self._poll_time = poll_time
+
+    def invalidate(self):
+        with llfuse.lock:
+            super(GroupsDirectory, self).invalidate()
+            for a in self._entries:
+                self._entries[a].invalidate()
+
+    def update(self):
+        groups = self.api.groups().list().execute()
+        oldentries = self._entries
+        self._entries = {}
+        for n in groups['items']:
+            id = n['name']
+            if id in oldentries and oldentries[id].uuid == n['uuid']:
+                self._entries[id] = oldentries[id]
+            else:
+                self._entries[id] = self.inodes.add_entry(GroupDirectory(self.inode, self.inodes, self.api,
+                                                                         n['uuid'], poll=self._poll, poll_time=self._poll_time))
+        self.fresh()
+
+
+class GroupDirectory(Directory):
+    '''A special directory that contains the contents of a group.'''
+
+    def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
+        super(GroupDirectory, self).__init__(parent_inode)
+        self.inodes = inodes
+        self.api = api
+        self.uuid = uuid
+        self._poll = poll
+        self._poll_time = poll_time
+
+    def invalidate(self):
+        with llfuse.lock:
+            super(GroupDirectory, self).invalidate()
+            for a in self._entries:
+                self._entries[a].invalidate()
+
+    def createDirectory(self, parent_inode, inodes, api, uuid, poll, poll_time):
+        print uuid
+        if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
+            return CollectionDirectory(parent_inode, inodes, i['uuid'])
+        if re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
+            return ObjectFile(parent_inode, inodes, json.dumps(i))
+        return None
+
+    def update(self):
+        contents = self.api.groups().contents(uuid=self.uuid).execute()
+        links = {}
+        for a in contents['links']:
+            links[a['head_uuid']] = a['name']
+
+        oldentries = self._entries
+        self._entries = {}
+
+        for i in contents['items']:
+            if i['uuid'] in links:
+                n = links[i['uuid']]
+            elif 'name' in i and len(i['name']) > 0:
+                n = i['name']
+            else:
+                n = i['uuid']
+
+            if n in oldentries and oldentries[n].uuid == i['uuid']:
+                self._entries[n] = oldentries[n]
+            else:
+                d = self.createDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
+                if d != None:
+                    self._entries[n] = self.inodes.add_entry(d)
+        self.fresh()
 
 
 class FileHandle(object):
@@ -353,7 +470,7 @@ class Operations(llfuse.Operations):
 
         try:
             with llfuse.lock_released:
-                return handle.entry.reader.readfrom(off, size)
+                return handle.entry.readfrom(off, size)
         except:
             raise llfuse.FUSEError(errno.EIO)
 
index 79b9148d7b2abbb6641d0fb720761d168b41b446..849af3b58649e59cf8a12a26f4bc34af01e75d1a 100755 (executable)
@@ -22,6 +22,7 @@ with "--".
     parser.add_argument('--collection', type=str, help="""Mount only the specified collection at the mount point.""")
     parser.add_argument('--tags', action='store_true', help="""Mount as a virtual directory consisting of subdirectories representing tagged
 collections on the server.""")
+    parser.add_argument('--groups', action='store_true', help="""Mount as a virtual directory consisting of subdirectories representing groups on the server.""")
     parser.add_argument('--debug', action='store_true', help="""Debug mode""")
     parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                         dest="exec_args", metavar=('command', 'args', '...', '--'),
@@ -32,7 +33,10 @@ collections on the server.""")
     # Create the request handler
     operations = Operations(os.getuid(), os.getgid())
 
-    if args.tags:
+    if args.groups:
+        api = arvados.api('v1')
+        e = operations.inodes.add_entry(GroupsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+    elif args.tags:
         api = arvados.api('v1')
         e = operations.inodes.add_entry(TagsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
     elif args.collection != None: