import pprint
import arvados.events
import re
+import apiclient
+import json
from time import time
from llfuse import FUSEError
-class File(object):
- '''Wraps a StreamFileReader for use by Directory.'''
+class FreshBase(object):
+ '''Base class for maintaining fresh/stale state to determine when to update.'''
+ def __init__(self):
+ self._stale = True
+ self._poll = False
+ self._last_update = time()
+ self._poll_time = 60
+
+ # Mark the value as stale
+ def invalidate(self):
+ self._stale = True
+
+ # Test if the entries dict is stale
+ def stale(self):
+ if self._stale:
+ return True
+ if self._poll:
+ return (self._last_update + self._poll_time) < time()
+ return False
+
+ def fresh(self):
+ self._stale = False
+ self._last_update = time()
+
+
+class File(FreshBase):
+ '''Base for file objects.'''
def __init__(self, parent_inode):
+ super(File, self).__init__()
self.inode = None
self.parent_inode = parent_inode
def size(self):
return 0
- def readfrom(self, off, size)):
+ def readfrom(self, off, size):
return ''
'''Wraps a StreamFileReader as a file.'''
def __init__(self, parent_inode, reader):
- super(StreamReaderFile, self).__init__(parent_inode, reader)
+ super(StreamReaderFile, self).__init__(parent_inode)
self.reader = reader
def size(self):
return self.reader.size()
- def readfrom(self, off, size)):
+ def readfrom(self, off, size):
return self.reader.readfrom(off, size)
+ def stale(self):
+ return False
+
class ObjectFile(File):
- '''Wraps a serialized object as a file.'''
+ '''Wraps a dict as a serialized json object.'''
def __init__(self, parent_inode, contents):
- super(ObjectFile, self).__init__(parent_inode, reader)
- self.contents = contents
+ super(ObjectFile, self).__init__(parent_inode)
+ self.contentsdict = contents
+ self.uuid = self.contentsdict['uuid']
+ self.contents = json.dumps(self.contentsdict, indent=4, sort_keys=True)
def size(self):
return len(self.contents)
return self.contents[off:(off+size)]
-class Directory(object):
+class Directory(FreshBase):
'''Generic directory object, backed by a dict.
Consists of a set of entries with the key representing the filename
and the value referencing a File or Directory object.
'''
def __init__(self, parent_inode):
+ super(Directory, self).__init__()
+
'''parent_inode is the integer inode number'''
self.inode = None
if not isinstance(parent_inode, int):
raise Exception("parent_inode should be an int")
self.parent_inode = parent_inode
self._entries = {}
- self._stale = True
- self._poll = False
- self._last_update = time()
- self._poll_time = 60
# Overriden by subclasses to implement logic to update the entries dict
# when the directory is stale
def update(self):
pass
- # Mark the entries dict as stale
- def invalidate(self):
- self._stale = True
-
- # Test if the entries dict is stale
- def stale(self):
- if self._stale:
- return True
- if self._poll:
- return (self._last_update + self._poll_time) < time()
- return False
-
- def fresh(self):
- self._stale = False
- self._last_update = time()
-
# Only used when computing the size of the disk footprint of the directory
# (stub)
def size(self):
return 0
- def __getitem__(self, item):
+ def checkupdate(self):
if self.stale():
- self.update()
+ try:
+ self.update()
+ except apiclient.errors.HttpError as e:
+ print e
+
+ def __getitem__(self, item):
+ self.checkupdate()
return self._entries[item]
def items(self):
- if self.stale():
- self.update()
+ self.checkupdate()
return self._entries.items()
def __iter__(self):
- if self.stale():
- self.update()
+ self.checkupdate()
return self._entries.iterkeys()
def __contains__(self, k):
- if self.stale():
- self.update()
+ self.checkupdate()
return k in self._entries
+ def merge(self, items, fn, same, new_entry):
+ '''Helper method for updating the contents of the directory.
+
+ items: array with new directory contents
+
+ fn: function to take an entry in 'items' and return the desired file or
+ directory name
+
+ same: function to compare an existing entry with an entry in the items
+ list to determine whether to keep the existing entry.
+
+ new_entry: function to create a new directory entry from array entry.
+ '''
+
+ oldentries = self._entries
+ self._entries = {}
+ for i in items:
+ n = fn(i)
+ if n in oldentries and same(oldentries[n], i):
+ self._entries[n] = oldentries[n]
+ del oldentries[n]
+ else:
+ self._entries[n] = self.inodes.add_entry(new_entry(i))
+ for n in oldentries:
+ llfuse.invalidate_entry(self.inode, str(n))
+ self.inodes.del_entry(oldentries[n])
+ self.fresh()
+
class CollectionDirectory(Directory):
'''Represents the root of a directory tree holding a collection.'''
self.inodes = inodes
self.collection_locator = collection_locator
+ def same(self, i):
+ return i['uuid'] == self.collection_locator
+
def update(self):
collection = arvados.CollectionReader(arvados.Keep.get(self.collection_locator))
for s in collection.all_streams():
def update(self):
tags = self.api.links().list(filters=[['link_class', '=', 'tag']], select=['name'], distinct = True).execute()
- oldentries = self._entries
- self._entries = {}
- for n in tags['items']:
- n = n['name']
- if n in oldentries:
- self._entries[n] = oldentries[n]
- else:
- self._entries[n] = self.inodes.add_entry(TagDirectory(self.inode, self.inodes, self.api, n, poll=self._poll, poll_time=self._poll_time))
- self.fresh()
-
+ self.merge(tags['items'],
+ lambda i: i['name'],
+ lambda a, i: a.tag == i,
+ lambda i: TagDirectory(self.inode, self.inodes, self.api, i['name'], poll=self._poll, poll_time=self._poll_time))
class TagDirectory(Directory):
'''A special directory that contains as subdirectories all collections visible
self._poll_time = poll_time
def update(self):
- collections = self.api.links().list(filters=[['link_class', '=', 'tag'],
+ taggedcollections = self.api.links().list(filters=[['link_class', '=', 'tag'],
['name', '=', self.tag],
['head_uuid', 'is_a', 'arvados#collection']],
select=['head_uuid']).execute()
- oldentries = self._entries
- self._entries = {}
- for c in collections['items']:
- n = c['head_uuid']
- if n in oldentries:
- self._entries[n] = oldentries[n]
- else:
- self._entries[n] = self.inodes.add_entry(CollectionDirectory(self.inode, self.inodes, n))
- self.fresh()
+ self.merge(taggedcollections['items'],
+ lambda i: i['head_uuid'],
+ lambda a, i: a.collection_locator == i['head_uuid'],
+ lambda i: CollectionDirectory(self.inode, self.inodes, i['head_uuid']))
class GroupsDirectory(Directory):
def update(self):
groups = self.api.groups().list().execute()
- oldentries = self._entries
- self._entries = {}
- for n in groups['items']:
- id = n['name']
- if id in oldentries and oldentries[id].uuid == n['uuid']:
- self._entries[id] = oldentries[id]
- else:
- self._entries[id] = self.inodes.add_entry(GroupDirectory(self.inode, self.inodes, self.api,
- n['uuid'], poll=self._poll, poll_time=self._poll_time))
- self.fresh()
+ self.merge(groups['items'],
+ lambda i: i['uuid'],
+ lambda a, i: a.uuid == i['uuid'],
+ lambda i: GroupDirectory(self.inode, self.inodes, self.api, i, poll=self._poll, poll_time=self._poll_time))
class GroupDirectory(Directory):
super(GroupDirectory, self).__init__(parent_inode)
self.inodes = inodes
self.api = api
- self.uuid = uuid
+ self.uuid = uuid['uuid']
self._poll = poll
self._poll_time = poll_time
for a in self._entries:
self._entries[a].invalidate()
- def createDirectory(self, parent_inode, inodes, api, uuid, poll, poll_time):
- print uuid
+ def createDirectory(self, i):
if re.match(r'[0-9a-f]{32}\+\d+', i['uuid']):
- return CollectionDirectory(parent_inode, inodes, i['uuid'])
- if re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
- return ObjectFile(parent_inode, inodes, json.dumps(i))
+ return CollectionDirectory(self.inode, self.inodes, i['uuid'])
+ elif re.match(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}', i['uuid']):
+ return GroupDirectory(self.parent_inode, self.inodes, self.api, i, self._poll, self._poll_time)
+ elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}', i['uuid']):
+ return ObjectFile(self.parent_inode, i)
return None
def update(self):
- contents = self.api.groups().contents(uuid=self.uuid).execute()
+ contents = self.api.groups().contents(uuid=self.uuid, include_linked=True).execute()
links = {}
for a in contents['links']:
links[a['head_uuid']] = a['name']
- oldentries = self._entries
- self._entries = {}
-
- for i in contents['items']:
+ def choose_name(i):
if i['uuid'] in links:
- n = links[i['uuid']]
- elif 'name' in i and len(i['name']) > 0:
- n = i['name']
+ return links[i['uuid']]
else:
- n = i['uuid']
+ return i['uuid']
- if n in oldentries and oldentries[n].uuid == i['uuid']:
- self._entries[n] = oldentries[n]
- else:
- d = self.createDirectory(self.inode, self.inodes, self.api, i, self._poll, self._poll_time)
- if d != None:
- self._entries[n] = self.inodes.add_entry(d)
- self.fresh()
+ def same(a, i):
+ if isinstance(a, CollectionDirectory):
+ return a.collection_locator == i['uuid']
+ elif isinstance(a, ObjectFile):
+ return a.uuid == i['uuid'] and not a.stale()
+ return False
+
+ self.merge(contents['items'],
+ choose_name,
+ same,
+ self.createDirectory)
class FileHandle(object):
self._counter += 1
return entry
+ def del_entry(self, entry):
+ llfuse.invalidate_inode(entry.inode)
+ del self._entries[entry.inode]
+
class Operations(llfuse.Operations):
'''This is the main interface with llfuse. The methods on this object are
called by llfuse threads to service FUSE events to query and read from
return True
def getattr(self, inode):
+ if inode not in self.inodes:
+ raise llfuse.FUSEError(errno.ENOENT)
+
e = self.inodes[inode]
entry = llfuse.EntryAttributes()
e = off
while e < len(handle.entry):
- yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
+ if handle.entry[e][1].inode in self.inodes:
+ yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
e += 1
def releasedir(self, fh):