import arvados
import pprint
import arvados.events
+import re
from time import time
from llfuse import FUSEError
+class File(object):
+ '''Wraps a StreamFileReader for use by Directory.'''
+
+ def __init__(self, parent_inode):
+ self.inode = None
+ self.parent_inode = parent_inode
+
+ def size(self):
+ return 0
+
+ def readfrom(self, off, size)):
+ return ''
+
+
+class StreamReaderFile(File):
+ '''Wraps a StreamFileReader as a file.'''
+
+ def __init__(self, parent_inode, reader):
+ super(StreamReaderFile, self).__init__(parent_inode, reader)
+ self.reader = reader
+
+ def size(self):
+ return self.reader.size()
+
+ def readfrom(self, off, size)):
+ return self.reader.readfrom(off, size)
+
+
+class ObjectFile(File):
+ '''Wraps a serialized object as a file.'''
+
+ def __init__(self, parent_inode, contents):
+ super(ObjectFile, self).__init__(parent_inode, reader)
+ self.contents = contents
+
+ def size(self):
+ return len(self.contents)
+
+ def readfrom(self, off, size):
+ return self.contents[off:(off+size)]
+
+
class Directory(object):
'''Generic directory object, backed by a dict.
Consists of a set of entries with the key representing the filename
cwd._entries[part] = self.inodes.add_entry(Directory(cwd.inode))
cwd = cwd._entries[part]
for k, v in s.files().items():
- cwd._entries[k] = self.inodes.add_entry(File(cwd.inode, v))
+ cwd._entries[k] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v))
self.fresh()
+
class MagicDirectory(Directory):
'''A special directory that logically contains the set of all extant keep
locators. When a file is referenced by lookup(), it is tested to see if it
self._entries[a].invalidate()
def update(self):
- tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = 'name').execute()
+ tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
oldentries = self._entries
self._entries = {}
for n in tags['items']:
self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
self.fresh()
-class File(object):
- '''Wraps a StreamFileReader for use by Directory.'''
- def __init__(self, parent_inode, reader):
- self.inode = None
- self.parent_inode = parent_inode
- self.reader = reader
+class GroupsDirectory(Directory):
+ '''A special directory that contains as subdirectories all groups visible to the user.'''
- def size(self):
- return self.reader.size()
+ def __init__(self, parent_inode, inodes, api, poll_time=60):
+ super(GroupsDirectory, self).__init__(parent_inode)
+ self.inodes = inodes
+ self.api = api
+ try:
+ arvados.events.subscribe(self.api, [], lambda ev: self.invalidate())
+ except:
+ self._poll = True
+ self._poll_time = poll_time
+
+ def invalidate(self):
+ with llfuse.lock:
+ super(GroupsDirectory, self).invalidate()
+ for a in self._entries:
+ self._entries[a].invalidate()
+
+ def update(self):
+ groups = self.api.groups().list().execute()
+ oldentries = self._entries
+ self._entries = {}
+ for n in groups['items']:
+ id = n['name']
+ if id in oldentries and oldentries[id].uuid == n['uuid']:
+ self._entries[id] = oldentries[id]
+ else:
+ self._entries[id] = self.inodes.add_entry(GroupDirectory(self.inode, self.inodes, self.api,
+ n['uuid'], poll=self._poll, poll_time=self._poll_time))
+ self.fresh()
+
+
+class GroupDirectory(Directory):
+ '''A special directory that contains the contents of a group.'''
+
+ def __init__(self, parent_inode, inodes, api, uuid, poll=False, poll_time=60):
+ super(GroupDirectory, self).__init__(parent_inode)
+ self.inodes = inodes
+ self.api = api
+ self.uuid = uuid
+ self._poll = poll
+ self._poll_time = poll_time
+
+ def invalidate(self):
+ with llfuse.lock:
+ super(GroupDirectory, self).invalidate()
+ for a in self._entries:
+ self._entries[a].invalidate()
+
+ def createDirectory(self, parent_inode, inodes, api, uuid, poll, poll_time):
+ print uuid
+ if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
+ return CollectionDirectory(parent_inode, inodes, i['uuid'])
+ if re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
+ return ObjectFile(parent_inode, inodes, json.dumps(i))
+ return None
+
+ def update(self):
+ contents = self.api.groups().contents(uuid=self.uuid).execute()
+ links = {}
+ for a in contents['links']:
+ links[a['head_uuid']] = a['name']
+
+ oldentries = self._entries
+ self._entries = {}
+
+ for i in contents['items']:
+ if i['uuid'] in links:
+ n = links[i['uuid']]
+ elif 'name' in i and len(i['name']) > 0:
+ n = i['name']
+ else:
+ n = i['uuid']
+
+ if n in oldentries and oldentries[n].uuid == i['uuid']:
+ self._entries[n] = oldentries[n]
+ else:
+ d = self.createDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
+ if d != None:
+ self._entries[n] = self.inodes.add_entry(d)
+ self.fresh()
class FileHandle(object):
try:
with llfuse.lock_released:
- return handle.entry.reader.readfrom(off, size)
+ return handle.entry.readfrom(off, size)
except:
raise llfuse.FUSEError(errno.EIO)
parser.add_argument('--collection', type=str, help="""Mount only the specified collection at the mount point.""")
parser.add_argument('--tags', action='store_true', help="""Mount as a virtual directory consisting of subdirectories representing tagged
collections on the server.""")
+ parser.add_argument('--groups', action='store_true', help="""Mount as a virtual directory consisting of subdirectories representing groups on the server.""")
parser.add_argument('--debug', action='store_true', help="""Debug mode""")
parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
dest="exec_args", metavar=('command', 'args', '...', '--'),
# Create the request handler
operations = Operations(os.getuid(), os.getgid())
- if args.tags:
+ if args.groups:
+ api = arvados.api('v1')
+ e = operations.inodes.add_entry(GroupsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
+ elif args.tags:
api = arvados.api('v1')
e = operations.inodes.add_entry(TagsDirectory(llfuse.ROOT_INODE, operations.inodes, api))
elif args.collection != None: