3198: Add enable_write flag to FUSE and --enable-write and --read-only to
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index 3393ac951420af1366edb71e01da2ed3081eabb6..a2b91a67187a27122bece6c8c381fcce823cdc19 100644 (file)
@@ -1,6 +1,49 @@
-#
-# FUSE driver for Arvados Keep
-#
+"""FUSE driver for Arvados Keep
+
+Architecture:
+
+There is one `Operations` object per mount point.  It is the entry point for all
+read and write requests from the llfuse module.
+
+The operations object owns an `Inodes` object.  The inodes object stores the
+mapping from numeric inode (used throughout the file system API to uniquely
+identify files) to the Python objects that implement files and directories.
+
+The `Inodes` object owns an `InodeCache` object.  The inode cache records the
+memory footprint of file system objects and when they are last used.  When the
+cache limit is exceeded, the least recently used objects are cleared.
+
+File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
+
+File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
+which implement actual reads and writes.
+
+Directory objects inherit from `fusedir.Directory`.  The directory object wraps
+a Python dict which stores the mapping from filenames to directory entries.
+Directory contents can be accessed through the Python operators such as `[]`
+and `in`.  These methods automatically check if the directory is fresh (up to
+date) or stale (needs update) and will call `update` if necessary before
+returing a result.
+
+The general FUSE operation flow is as follows:
+
+- The request handler is called with either an inode or file handle that is the
+  subject of the operation.
+
+- Look up the inode using the Inodes table or the file handle in the
+  filehandles table to get the file system object.
+
+- For methods that alter files or directories, check that the operation is
+  valid and permitted using _check_writable().
+
+- Call the relevant method on the file system object.
+
+- Return the result.
+
+The FUSE driver supports the Arvados event bus.  When an event is received for
+an object that is live in the inode cache, that object is immediately updated.
+
+"""
 
 import os
 import sys
@@ -20,693 +63,131 @@ import _strptime
 import calendar
 import threading
 import itertools
+import ciso8601
+import collections
+import functools
 
-from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
+from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from fusefile import StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
-# Match any character which FUSE or Linux cannot accommodate as part
-# of a filename. (If present in a collection filename, they will
-# appear as underscores in the fuse mount.)
-_disallowed_filename_characters = re.compile('[\x00/]')
-
-class SafeApi(object):
-    '''Threadsafe wrapper for API object.  This stores and returns a different api
-    object per thread, because httplib2 which underlies apiclient is not
-    threadsafe.
-    '''
-
-    def __init__(self, config):
-        self.host = config.get('ARVADOS_API_HOST')
-        self.api_token = config.get('ARVADOS_API_TOKEN')
-        self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
-        self.local = threading.local()
-        self.block_cache = arvados.KeepBlockCache()
-
-    def localapi(self):
-        if 'api' not in self.local.__dict__:
-            self.local.api = arvados.api('v1', False, self.host,
-                                         self.api_token, self.insecure)
-        return self.local.api
-
-    def localkeep(self):
-        if 'keep' not in self.local.__dict__:
-            self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
-        return self.local.keep
-
-    def __getattr__(self, name):
-        # Proxy nonexistent attributes to the local API client.
-        try:
-            return getattr(self.localapi(), name)
-        except AttributeError:
-            return super(SafeApi, self).__getattr__(name)
-
-
-def convertTime(t):
-    '''Parse Arvados timestamp to unix time.'''
-    try:
-        return calendar.timegm(time.strptime(t, "%Y-%m-%dT%H:%M:%SZ"))
-    except (TypeError, ValueError):
-        return 0
-
-def sanitize_filename(dirty):
-    '''Replace disallowed filename characters with harmless "_".'''
-    if dirty is None:
-        return None
-    elif dirty == '':
-        return '_'
-    elif dirty == '.':
-        return '_'
-    elif dirty == '..':
-        return '__'
-    else:
-        return _disallowed_filename_characters.sub('_', dirty)
-
-
-class FreshBase(object):
-    '''Base class for maintaining fresh/stale state to determine when to update.'''
-    def __init__(self):
-        self._stale = True
-        self._poll = False
-        self._last_update = time.time()
-        self._atime = time.time()
-        self._poll_time = 60
-
-    # Mark the value as stale
-    def invalidate(self):
-        self._stale = True
-
-    # Test if the entries dict is stale.
-    def stale(self):
-        if self._stale:
-            return True
-        if self._poll:
-            return (self._last_update + self._poll_time) < self._atime
-        return False
-
-    def fresh(self):
-        self._stale = False
-        self._last_update = time.time()
-
-    def atime(self):
-        return self._atime
-
-class File(FreshBase):
-    '''Base for file objects.'''
-
-    def __init__(self, parent_inode, _mtime=0):
-        super(File, self).__init__()
-        self.inode = None
-        self.parent_inode = parent_inode
-        self._mtime = _mtime
-
-    def size(self):
-        return 0
-
-    def readfrom(self, off, size):
-        return ''
-
-    def mtime(self):
-        return self._mtime
-
-
-class StreamReaderFile(File):
-    '''Wraps a StreamFileReader as a file.'''
-
-    def __init__(self, parent_inode, reader, _mtime):
-        super(StreamReaderFile, self).__init__(parent_inode, _mtime)
-        self.reader = reader
-
-    def size(self):
-        return self.reader.size()
-
-    def readfrom(self, off, size):
-        return self.reader.readfrom(off, size)
-
-    def stale(self):
-        return False
-
-
-class StringFile(File):
-    '''Wrap a simple string as a file'''
-    def __init__(self, parent_inode, contents, _mtime):
-        super(StringFile, self).__init__(parent_inode, _mtime)
-        self.contents = contents
-
-    def size(self):
-        return len(self.contents)
-
-    def readfrom(self, off, size):
-        return self.contents[off:(off+size)]
-
-
-class ObjectFile(StringFile):
-    '''Wrap a dict as a serialized json object.'''
-
-    def __init__(self, parent_inode, obj):
-        super(ObjectFile, self).__init__(parent_inode, "", 0)
-        self.uuid = obj['uuid']
-        self.update(obj)
-
-    def update(self, obj):
-        self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
-        self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
-
-
-class Directory(FreshBase):
-    '''Generic directory object, backed by a dict.
-    Consists of a set of entries with the key representing the filename
-    and the value referencing a File or Directory object.
-    '''
+# Uncomment this to enable llfuse debug logging.
+# log_handler = logging.StreamHandler()
+# llogger = logging.getLogger('llfuse')
+# llogger.addHandler(log_handler)
+# llogger.setLevel(logging.DEBUG)
 
-    def __init__(self, parent_inode):
-        super(Directory, self).__init__()
+class Handle(object):
+    """Connects a numeric file handle to a File or Directory object that has
+    been opened by the client."""
 
-        '''parent_inode is the integer inode number'''
-        self.inode = None
-        if not isinstance(parent_inode, int):
-            raise Exception("parent_inode should be an int")
-        self.parent_inode = parent_inode
-        self._entries = {}
-        self._mtime = time.time()
-
-    #  Overriden by subclasses to implement logic to update the entries dict
-    #  when the directory is stale
-    def update(self):
-        pass
-
-    # Only used when computing the size of the disk footprint of the directory
-    # (stub)
-    def size(self):
-        return 0
-
-    def checkupdate(self):
-        if self.stale():
-            try:
-                self.update()
-            except apiclient.errors.HttpError as e:
-                _logger.debug(e)
-
-    def __getitem__(self, item):
-        self.checkupdate()
-        return self._entries[item]
-
-    def items(self):
-        self.checkupdate()
-        return self._entries.items()
-
-    def __iter__(self):
-        self.checkupdate()
-        return self._entries.iterkeys()
-
-    def __contains__(self, k):
-        self.checkupdate()
-        return k in self._entries
-
-    def merge(self, items, fn, same, new_entry):
-        '''Helper method for updating the contents of the directory.  Takes a list
-        describing the new contents of the directory, reuse entries that are
-        the same in both the old and new lists, create new entries, and delete
-        old entries missing from the new list.
-
-        items: iterable with new directory contents
-
-        fn: function to take an entry in 'items' and return the desired file or
-        directory name, or None if this entry should be skipped
-
-        same: function to compare an existing entry (a File or Directory
-        object) with an entry in the items list to determine whether to keep
-        the existing entry.
-
-        new_entry: function to create a new directory entry (File or Directory
-        object) from an entry in the items list.
+    def __init__(self, fh, obj):
+        self.fh = fh
+        self.obj = obj
+        self.obj.inc_use()
 
-        '''
+    def release(self):
+        self.obj.dec_use()
 
-        oldentries = self._entries
-        self._entries = {}
-        changed = False
-        for i in items:
-            name = sanitize_filename(fn(i))
-            if name:
-                if name in oldentries and same(oldentries[name], i):
-                    # move existing directory entry over
-                    self._entries[name] = oldentries[name]
-                    del oldentries[name]
-                else:
-                    # create new directory entry
-                    ent = new_entry(i)
-                    if ent is not None:
-                        self._entries[name] = self.inodes.add_entry(ent)
-                        changed = True
-
-        # delete any other directory entries that were not in found in 'items'
-        for i in oldentries:
-            llfuse.invalidate_entry(self.inode, str(i))
-            self.inodes.del_entry(oldentries[i])
-            changed = True
-
-        if changed:
-            self._mtime = time.time()
-
-        self.fresh()
-
-    def clear(self):
-        '''Delete all entries'''
-        oldentries = self._entries
-        self._entries = {}
-        for n in oldentries:
-            if isinstance(n, Directory):
-                n.clear()
-            llfuse.invalidate_entry(self.inode, str(n))
-            self.inodes.del_entry(oldentries[n])
-        llfuse.invalidate_inode(self.inode)
-        self.invalidate()
+    def flush(self):
+        return self.obj.flush()
 
-    def mtime(self):
-        return self._mtime
 
+class FileHandle(Handle):
+    """Connects a numeric file handle to a File  object that has
+    been opened by the client."""
+    pass
 
-class CollectionDirectory(Directory):
-    '''Represents the root of a directory tree holding a collection.'''
 
-    def __init__(self, parent_inode, inodes, api, num_retries, collection):
-        super(CollectionDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
-        self.api = api
-        self.num_retries = num_retries
-        self.collection_object_file = None
-        self.collection_object = None
-        if isinstance(collection, dict):
-            self.collection_locator = collection['uuid']
-        else:
-            self.collection_locator = collection
-
-    def same(self, i):
-        return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
-
-    # Used by arv-web.py to switch the contents of the CollectionDirectory
-    def change_collection(self, new_locator):
-        """Switch the contents of the CollectionDirectory.  Must be called with llfuse.lock held."""
-        self.collection_locator = new_locator
-        self.collection_object = None
-        self.update()
-
-    def new_collection(self, new_collection_object, coll_reader):
-        self.collection_object = new_collection_object
-
-        if self.collection_object_file is not None:
-            self.collection_object_file.update(self.collection_object)
-
-        self.clear()
-        for s in coll_reader.all_streams():
-            cwd = self
-            for part in s.name().split('/'):
-                if part != '' and part != '.':
-                    partname = sanitize_filename(part)
-                    if partname not in cwd._entries:
-                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
-                    cwd = cwd._entries[partname]
-            for k, v in s.files().items():
-                cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
-
-    def update(self):
-        try:
-            if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
-                return True
-
-            if self.collection_locator is None:
-                self.fresh()
-                return True
-
-            with llfuse.lock_released:
-                coll_reader = arvados.CollectionReader(
-                    self.collection_locator, self.api, self.api.localkeep(),
-                    num_retries=self.num_retries)
-                new_collection_object = coll_reader.api_response() or {}
-                # If the Collection only exists in Keep, there will be no API
-                # response.  Fill in the fields we need.
-                if 'uuid' not in new_collection_object:
-                    new_collection_object['uuid'] = self.collection_locator
-                if "portable_data_hash" not in new_collection_object:
-                    new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
-                if 'manifest_text' not in new_collection_object:
-                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
-                coll_reader.normalize()
-            # end with llfuse.lock_released, re-acquire lock
-
-            if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
-                self.new_collection(new_collection_object, coll_reader)
-
-            self.fresh()
-            return True
-        except arvados.errors.NotFoundError:
-            _logger.exception("arv-mount %s: error", self.collection_locator)
-        except arvados.errors.ArgumentError as detail:
-            _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
-        except Exception:
-            _logger.exception("arv-mount %s: error", self.collection_locator)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
-        return False
-
-    def __getitem__(self, item):
-        self.checkupdate()
-        if item == '.arvados#collection':
-            if self.collection_object_file is None:
-                self.collection_object_file = ObjectFile(self.inode, self.collection_object)
-                self.inodes.add_entry(self.collection_object_file)
-            return self.collection_object_file
-        else:
-            return super(CollectionDirectory, self).__getitem__(item)
+class DirectoryHandle(Handle):
+    """Connects a numeric file handle to a Directory object that has
+    been opened by the client."""
 
-    def __contains__(self, k):
-        if k == '.arvados#collection':
-            return True
-        else:
-            return super(CollectionDirectory, self).__contains__(k)
-
-    def mtime(self):
-        self.checkupdate()
-        return convertTime(self.collection_object["modified_at"]) if self.collection_object is not None and 'modified_at' in self.collection_object else 0
-
-
-class MagicDirectory(Directory):
-    '''A special directory that logically contains the set of all extant keep
-    locators.  When a file is referenced by lookup(), it is tested to see if it
-    is a valid keep locator to a manifest, and if so, loads the manifest
-    contents as a subdirectory of this directory with the locator as the
-    directory name.  Since querying a list of all extant keep locators is
-    impractical, only collections that have already been accessed are visible
-    to readdir().
-    '''
-
-    README_TEXT = '''
-This directory provides access to Arvados collections as subdirectories listed
-by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
-the form '1234567890abcdefghijklmnopqrstuv+123').
-
-Note that this directory will appear empty until you attempt to access a
-specific collection subdirectory (such as trying to 'cd' into it), at which
-point the collection will actually be looked up on the server and the directory
-will appear if it exists.
-'''.lstrip()
-
-    def __init__(self, parent_inode, inodes, api, num_retries):
-        super(MagicDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
-        self.api = api
-        self.num_retries = num_retries
+    def __init__(self, fh, dirobj, entries):
+        super(DirectoryHandle, self).__init__(fh, dirobj)
+        self.entries = entries
 
-    def __setattr__(self, name, value):
-        super(MagicDirectory, self).__setattr__(name, value)
-        # When we're assigned an inode, add a README.
-        if ((name == 'inode') and (self.inode is not None) and
-              (not self._entries)):
-            self._entries['README'] = self.inodes.add_entry(
-                StringFile(self.inode, self.README_TEXT, time.time()))
-            # If we're the root directory, add an identical by_id subdirectory.
-            if self.inode == llfuse.ROOT_INODE:
-                self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
-                        self.inode, self.inodes, self.api, self.num_retries))
 
-    def __contains__(self, k):
-        if k in self._entries:
-            return True
+class InodeCache(object):
+    """Records the memory footprint of objects and when they are last used.
 
-        if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
-            return False
+    When the cache limit is exceeded, the least recently used objects are
+    cleared.  Clearing the object means discarding its contents to release
+    memory.  The next time the object is accessed, it must be re-fetched from
+    the server.  Note that the inode cache limit is a soft limit; the cache
+    limit may be exceeded if necessary to load very large objects, it may also
+    be exceeded if open file handles prevent objects from being cleared.
 
-        try:
-            e = self.inodes.add_entry(CollectionDirectory(
-                    self.inode, self.inodes, self.api, self.num_retries, k))
-            if e.update():
-                self._entries[k] = e
-                return True
-            else:
-                return False
-        except Exception as e:
-            _logger.debug('arv-mount exception keep %s', e)
-            return False
+    """
 
-    def __getitem__(self, item):
-        if item in self:
-            return self._entries[item]
-        else:
-            raise KeyError("No collection with id " + item)
+    def __init__(self, cap, min_entries=4):
+        self._entries = collections.OrderedDict()
+        self._by_uuid = {}
+        self._counter = itertools.count(0)
+        self.cap = cap
+        self._total = 0
+        self.min_entries = min_entries
 
+    def total(self):
+        return self._total
 
-class RecursiveInvalidateDirectory(Directory):
-    def invalidate(self):
-        if self.inode == llfuse.ROOT_INODE:
-            llfuse.lock.acquire()
-        try:
-            super(RecursiveInvalidateDirectory, self).invalidate()
-            for a in self._entries:
-                self._entries[a].invalidate()
-        except Exception:
-            _logger.exception()
-        finally:
-            if self.inode == llfuse.ROOT_INODE:
-                llfuse.lock.release()
-
-
-class TagsDirectory(RecursiveInvalidateDirectory):
-    '''A special directory that contains as subdirectories all tags visible to the user.'''
-
-    def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
-        self.api = api
-        self.num_retries = num_retries
-        self._poll = True
-        self._poll_time = poll_time
-
-    def update(self):
-        with llfuse.lock_released:
-            tags = self.api.links().list(
-                filters=[['link_class', '=', 'tag']],
-                select=['name'], distinct=True
-                ).execute(num_retries=self.num_retries)
-        if "items" in tags:
-            self.merge(tags['items'],
-                       lambda i: i['name'],
-                       lambda a, i: a.tag == i['name'],
-                       lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
-
-
-class TagDirectory(Directory):
-    '''A special directory that contains as subdirectories all collections visible
-    to the user that are tagged with a particular tag.
-    '''
-
-    def __init__(self, parent_inode, inodes, api, num_retries, tag,
-                 poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
-        self.api = api
-        self.num_retries = num_retries
-        self.tag = tag
-        self._poll = poll
-        self._poll_time = poll_time
-
-    def update(self):
-        with llfuse.lock_released:
-            taggedcollections = self.api.links().list(
-                filters=[['link_class', '=', 'tag'],
-                         ['name', '=', self.tag],
-                         ['head_uuid', 'is_a', 'arvados#collection']],
-                select=['head_uuid']
-                ).execute(num_retries=self.num_retries)
-        self.merge(taggedcollections['items'],
-                   lambda i: i['head_uuid'],
-                   lambda a, i: a.collection_locator == i['head_uuid'],
-                   lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
-
-
-class ProjectDirectory(Directory):
-    '''A special directory that contains the contents of a project.'''
-
-    def __init__(self, parent_inode, inodes, api, num_retries, project_object,
-                 poll=False, poll_time=60):
-        super(ProjectDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
-        self.api = api
-        self.num_retries = num_retries
-        self.project_object = project_object
-        self.project_object_file = None
-        self.uuid = project_object['uuid']
-        self._poll = poll
-        self._poll_time = poll_time
-
-    def createDirectory(self, i):
-        if collection_uuid_pattern.match(i['uuid']):
-            return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
-        elif group_uuid_pattern.match(i['uuid']):
-            return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time)
-        elif link_uuid_pattern.match(i['uuid']):
-            if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
-                return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
-            else:
-                return None
-        elif uuid_pattern.match(i['uuid']):
-            return ObjectFile(self.parent_inode, i)
-        else:
-            return None
-
-    def update(self):
-        if self.project_object_file == None:
-            self.project_object_file = ObjectFile(self.inode, self.project_object)
-            self.inodes.add_entry(self.project_object_file)
-
-        def namefn(i):
-            if 'name' in i:
-                if i['name'] is None or len(i['name']) == 0:
-                    return None
-                elif collection_uuid_pattern.match(i['uuid']) or group_uuid_pattern.match(i['uuid']):
-                    # collection or subproject
-                    return i['name']
-                elif link_uuid_pattern.match(i['uuid']) and i['head_kind'] == 'arvados#collection':
-                    # name link
-                    return i['name']
-                elif 'kind' in i and i['kind'].startswith('arvados#'):
-                    # something else
-                    return "{}.{}".format(i['name'], i['kind'][8:])
-            else:
-                return None
-
-        def samefn(a, i):
-            if isinstance(a, CollectionDirectory):
-                return a.collection_locator == i['uuid']
-            elif isinstance(a, ProjectDirectory):
-                return a.uuid == i['uuid']
-            elif isinstance(a, ObjectFile):
-                return a.uuid == i['uuid'] and not a.stale()
+    def _remove(self, obj, clear):
+        if clear and not obj.clear():
+            _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
             return False
+        self._total -= obj.cache_size
+        del self._entries[obj.cache_priority]
+        if obj.cache_uuid:
+            del self._by_uuid[obj.cache_uuid]
+            obj.cache_uuid = None
+        if clear:
+            _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
+        return True
 
-        with llfuse.lock_released:
-            if group_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.groups().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
-            elif user_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.users().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
-
-            contents = arvados.util.list_all(self.api.groups().contents,
-                                             self.num_retries, uuid=self.uuid)
-            # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(
-                self.api.links().list, self.num_retries,
-                filters=[['tail_uuid', '=', self.uuid],
-                         ['link_class', '=', 'name']])
-
-        # end with llfuse.lock_released, re-acquire lock
-
-        self.merge(contents,
-                   namefn,
-                   samefn,
-                   self.createDirectory)
-
-    def __getitem__(self, item):
-        self.checkupdate()
-        if item == '.arvados#project':
-            return self.project_object_file
-        else:
-            return super(ProjectDirectory, self).__getitem__(item)
-
-    def __contains__(self, k):
-        if k == '.arvados#project':
-            return True
+    def cap_cache(self):
+        if self._total > self.cap:
+            for key in list(self._entries.keys()):
+                if self._total < self.cap or len(self._entries) < self.min_entries:
+                    break
+                self._remove(self._entries[key], True)
+
+    def manage(self, obj):
+        if obj.persisted():
+            obj.cache_priority = next(self._counter)
+            obj.cache_size = obj.objsize()
+            self._entries[obj.cache_priority] = obj
+            obj.cache_uuid = obj.uuid()
+            if obj.cache_uuid:
+                self._by_uuid[obj.cache_uuid] = obj
+            self._total += obj.objsize()
+            _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
+            self.cap_cache()
         else:
-            return super(ProjectDirectory, self).__contains__(k)
+            obj.cache_priority = None
 
+    def touch(self, obj):
+        if obj.persisted():
+            if obj.cache_priority in self._entries:
+                self._remove(obj, False)
+            self.manage(obj)
 
-class SharedDirectory(Directory):
-    '''A special directory that represents users or groups who have shared projects with me.'''
-
-    def __init__(self, parent_inode, inodes, api, num_retries, exclude,
-                 poll=False, poll_time=60):
-        super(SharedDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
-        self.api = api
-        self.num_retries = num_retries
-        self.current_user = api.users().current().execute(num_retries=num_retries)
-        self._poll = True
-        self._poll_time = poll_time
-
-    def update(self):
-        with llfuse.lock_released:
-            all_projects = arvados.util.list_all(
-                self.api.groups().list, self.num_retries,
-                filters=[['group_class','=','project']])
-            objects = {}
-            for ob in all_projects:
-                objects[ob['uuid']] = ob
-
-            roots = []
-            root_owners = {}
-            for ob in all_projects:
-                if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
-                    roots.append(ob)
-                    root_owners[ob['owner_uuid']] = True
-
-            lusers = arvados.util.list_all(
-                self.api.users().list, self.num_retries,
-                filters=[['uuid','in', list(root_owners)]])
-            lgroups = arvados.util.list_all(
-                self.api.groups().list, self.num_retries,
-                filters=[['uuid','in', list(root_owners)]])
-
-            users = {}
-            groups = {}
-
-            for l in lusers:
-                objects[l["uuid"]] = l
-            for l in lgroups:
-                objects[l["uuid"]] = l
-
-            contents = {}
-            for r in root_owners:
-                if r in objects:
-                    obr = objects[r]
-                    if "name" in obr:
-                        contents[obr["name"]] = obr
-                    if "first_name" in obr:
-                        contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
-
-            for r in roots:
-                if r['owner_uuid'] not in objects:
-                    contents[r['name']] = r
-
-        # end with llfuse.lock_released, re-acquire lock
-
-        try:
-            self.merge(contents.items(),
-                       lambda i: i[0],
-                       lambda a, i: a.uuid == i[1]['uuid'],
-                       lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
-        except Exception:
-            _logger.exception()
-
-
-class FileHandle(object):
-    '''Connects a numeric file handle to a File or Directory object that has
-    been opened by the client.'''
-
-    def __init__(self, fh, entry):
-        self.fh = fh
-        self.entry = entry
+    def unmanage(self, obj):
+        if obj.persisted() and obj.cache_priority in self._entries:
+            self._remove(obj, True)
 
+    def find(self, uuid):
+        return self._by_uuid.get(uuid)
 
 class Inodes(object):
-    '''Manage the set of inodes.  This is the mapping from a numeric id
-    to a concrete File or Directory object'''
+    """Manage the set of inodes.  This is the mapping from a numeric id
+    to a concrete File or Directory object"""
 
-    def __init__(self):
+    def __init__(self, inode_cache, encoding="utf-8"):
         self._entries = {}
         self._counter = itertools.count(llfuse.ROOT_INODE)
+        self.inode_cache = inode_cache
+        self.encoding = encoding
 
     def __getitem__(self, item):
         return self._entries[item]
@@ -723,48 +204,132 @@ class Inodes(object):
     def __contains__(self, k):
         return k in self._entries
 
+    def touch(self, entry):
+        entry._atime = time.time()
+        self.inode_cache.touch(entry)
+
     def add_entry(self, entry):
         entry.inode = next(self._counter)
+        if entry.inode == llfuse.ROOT_INODE:
+            entry.inc_ref()
         self._entries[entry.inode] = entry
+        self.inode_cache.manage(entry)
         return entry
 
     def del_entry(self, entry):
-        llfuse.invalidate_inode(entry.inode)
-        del self._entries[entry.inode]
+        if entry.ref_count == 0:
+            _logger.debug("Deleting inode %i", entry.inode)
+            self.inode_cache.unmanage(entry)
+            llfuse.invalidate_inode(entry.inode)
+            entry.finalize()
+            del self._entries[entry.inode]
+            entry.inode = None
+        else:
+            entry.dead = True
+            _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+
+def catch_exceptions(orig_func):
+    """Catch uncaught exceptions and log them consistently."""
+
+    @functools.wraps(orig_func)
+    def catch_exceptions_wrapper(self, *args, **kwargs):
+        try:
+            return orig_func(self, *args, **kwargs)
+        except llfuse.FUSEError:
+            raise
+        except EnvironmentError as e:
+            raise llfuse.FUSEError(e.errno)
+        except:
+            _logger.exception("Unhandled exception during FUSE operation")
+            raise llfuse.FUSEError(errno.EIO)
+
+    return catch_exceptions_wrapper
+
 
 class Operations(llfuse.Operations):
-    '''This is the main interface with llfuse.  The methods on this object are
-    called by llfuse threads to service FUSE events to query and read from
-    the file system.
+    """This is the main interface with llfuse.
+
+    The methods on this object are called by llfuse threads to service FUSE
+    events to query and read from the file system.
 
     llfuse has its own global lock which is acquired before calling a request handler,
     so request handlers do not run concurrently unless the lock is explicitly released
-    using "with llfuse.lock_released:"'''
+    using 'with llfuse.lock_released:'
 
-    def __init__(self, uid, gid, encoding="utf-8"):
+    """
+
+    def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
         super(Operations, self).__init__()
 
-        self.inodes = Inodes()
+        if not inode_cache:
+            inode_cache = InodeCache(cap=256*1024*1024)
+        self.inodes = Inodes(inode_cache, encoding=encoding)
         self.uid = uid
         self.gid = gid
-        self.encoding = encoding
+        self.enable_write = enable_write
 
         # dict of inode to filehandle
         self._filehandles = {}
-        self._filehandles_counter = 1
+        self._filehandles_counter = itertools.count(0)
 
         # Other threads that need to wait until the fuse driver
         # is fully initialized should wait() on this event object.
         self.initlock = threading.Event()
 
+        self.num_retries = num_retries
+
+        self.events = None
+
     def init(self):
         # Allow threads that are waiting for the driver to be finished
         # initializing to continue
         self.initlock.set()
 
+    @catch_exceptions
+    def destroy(self):
+        if self.events:
+            self.events.close()
+            self.events = None
+
+        for k,v in self.inodes.items():
+            v.finalize()
+        self.inodes = None
+
     def access(self, inode, mode, ctx):
         return True
 
+    def listen_for_events(self, api_client):
+        self.events = arvados.events.subscribe(api_client,
+                                 [["event_type", "in", ["create", "update", "delete"]]],
+                                 self.on_event)
+
+    @catch_exceptions
+    def on_event(self, ev):
+        if 'event_type' in ev:
+            with llfuse.lock:
+                item = self.inodes.inode_cache.find(ev["object_uuid"])
+                if item is not None:
+                    item.invalidate()
+                    if ev["object_kind"] == "arvados#collection":
+                        new_attr = ev.get("properties") and ev["properties"].get("new_attributes") and ev["properties"]["new_attributes"]
+                        record_version = (new_attr["modified_at"], new_attr["portable_data_hash"]) if new_attr else None
+                        item.update(to_record_version=record_version)
+                    else:
+                        item.update()
+
+                oldowner = ev.get("properties") and ev["properties"].get("old_attributes") and ev["properties"]["old_attributes"].get("owner_uuid")
+                olditemparent = self.inodes.inode_cache.find(oldowner)
+                if olditemparent is not None:
+                    olditemparent.invalidate()
+                    olditemparent.update()
+
+                itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
+                if itemparent is not None:
+                    itemparent.invalidate()
+                    itemparent.update()
+
+    @catch_exceptions
     def getattr(self, inode):
         if inode not in self.inodes:
             raise llfuse.FUSEError(errno.ENOENT)
@@ -774,16 +339,19 @@ class Operations(llfuse.Operations):
         entry = llfuse.EntryAttributes()
         entry.st_ino = inode
         entry.generation = 0
-        entry.entry_timeout = 300
-        entry.attr_timeout = 300
+        entry.entry_timeout = 60
+        entry.attr_timeout = 60
 
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
-        elif isinstance(e, StreamReaderFile):
-            entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
         else:
             entry.st_mode |= stat.S_IFREG
+            if isinstance(e, FuseArvadosFile):
+                entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
+
+        if self.enable_write and e.writable():
+            entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
 
         entry.st_nlink = 1
         entry.st_uid = self.uid
@@ -793,17 +361,29 @@ class Operations(llfuse.Operations):
         entry.st_size = e.size()
 
         entry.st_blksize = 512
-        entry.st_blocks = (e.size()/512)+1
+        entry.st_blocks = (entry.st_size/512)+1
         entry.st_atime = int(e.atime())
         entry.st_mtime = int(e.mtime())
         entry.st_ctime = int(e.mtime())
 
         return entry
 
+    @catch_exceptions
+    def setattr(self, inode, attr):
+        entry = self.getattr(inode)
+
+        e = self.inodes[inode]
+
+        if attr.st_size is not None and isinstance(e, FuseArvadosFile):
+            with llfuse.lock_released:
+                e.arvfile.truncate(attr.st_size)
+                entry.st_size = e.arvfile.size()
+
+        return entry
+
+    @catch_exceptions
     def lookup(self, parent_inode, name):
-        name = unicode(name, self.encoding)
-        _logger.debug("arv-mount lookup: parent_inode %i name %s",
-                      parent_inode, name)
+        name = unicode(name, self.inodes.encoding)
         inode = None
 
         if name == '.':
@@ -817,27 +397,42 @@ class Operations(llfuse.Operations):
                     inode = p[name].inode
 
         if inode != None:
+            _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
+                      parent_inode, name, inode)
+            self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
+            _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
+                      parent_inode, name)
             raise llfuse.FUSEError(errno.ENOENT)
 
+    @catch_exceptions
+    def forget(self, inodes):
+        for inode, nlookup in inodes:
+            ent = self.inodes[inode]
+            _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
+            if ent.dec_ref(nlookup) == 0 and ent.dead:
+                self.inodes.del_entry(ent)
+
+    @catch_exceptions
     def open(self, inode, flags):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
             raise llfuse.FUSEError(errno.ENOENT)
 
-        if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
-            raise llfuse.FUSEError(errno.EROFS)
-
         if isinstance(p, Directory):
             raise llfuse.FUSEError(errno.EISDIR)
 
-        fh = self._filehandles_counter
-        self._filehandles_counter += 1
+        if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        fh = next(self._filehandles_counter)
         self._filehandles[fh] = FileHandle(fh, p)
+        self.inodes.touch(p)
         return fh
 
+    @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read %i %i %i", fh, off, size)
         if fh in self._filehandles:
@@ -845,23 +440,46 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        # update atime
-        handle.entry._atime = time.time()
+        self.inodes.touch(handle.obj)
 
         try:
-            with llfuse.lock_released:
-                return handle.entry.readfrom(off, size)
+            return handle.obj.readfrom(off, size, self.num_retries)
         except arvados.errors.NotFoundError as e:
-            _logger.warning("Block not found: " + str(e))
-            raise llfuse.FUSEError(errno.EIO)
-        except Exception:
-            _logger.exception()
+            _logger.error("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
 
+    @catch_exceptions
+    def write(self, fh, off, buf):
+        _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
+        if fh in self._filehandles:
+            handle = self._filehandles[fh]
+        else:
+            raise llfuse.FUSEError(errno.EBADF)
+
+        if not handle.obj.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        self.inodes.touch(handle.obj)
+
+        return handle.obj.writeto(off, buf, self.num_retries)
+
+    @catch_exceptions
     def release(self, fh):
         if fh in self._filehandles:
+            try:
+                self._filehandles[fh].flush()
+            except EnvironmentError as e:
+                raise llfuse.FUSEError(e.errno)
+            except Exception:
+                _logger.exception("Flush error")
+            self._filehandles[fh].release()
             del self._filehandles[fh]
+        self.inodes.inode_cache.cap_cache()
 
+    def releasedir(self, fh):
+        self.release(fh)
+
+    @catch_exceptions
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
 
@@ -873,19 +491,19 @@ class Operations(llfuse.Operations):
         if not isinstance(p, Directory):
             raise llfuse.FUSEError(errno.ENOTDIR)
 
-        fh = self._filehandles_counter
-        self._filehandles_counter += 1
+        fh = next(self._filehandles_counter)
         if p.parent_inode in self.inodes:
             parent = self.inodes[p.parent_inode]
         else:
             raise llfuse.FUSEError(errno.EIO)
 
         # update atime
-        p._atime = time.time()
+        self.inodes.touch(p)
 
-        self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
         return fh
 
+    @catch_exceptions
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
 
@@ -894,23 +512,18 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        _logger.debug("arv-mount handle.entry %s", handle.entry)
+        _logger.debug("arv-mount handle.dirobj %s", handle.obj)
 
         e = off
-        while e < len(handle.entry):
-            if handle.entry[e][1].inode in self.inodes:
-                try:
-                    yield (handle.entry[e][0].encode(self.encoding), self.getattr(handle.entry[e][1].inode), e+1)
-                except UnicodeEncodeError:
-                    pass
+        while e < len(handle.entries):
+            if handle.entries[e][1].inode in self.inodes:
+                yield (handle.entries[e][0].encode(self.inodes.encoding), self.getattr(handle.entries[e][1].inode), e+1)
             e += 1
 
-    def releasedir(self, fh):
-        del self._filehandles[fh]
-
+    @catch_exceptions
     def statfs(self):
         st = llfuse.StatvfsData()
-        st.f_bsize = 64 * 1024
+        st.f_bsize = 128 * 1024
         st.f_blocks = 0
         st.f_files = 0
 
@@ -923,12 +536,76 @@ class Operations(llfuse.Operations):
         st.f_frsize = 0
         return st
 
-    # The llfuse documentation recommends only overloading functions that
-    # are actually implemented, as the default implementation will raise ENOSYS.
-    # However, there is a bug in the llfuse default implementation of create()
-    # "create() takes exactly 5 positional arguments (6 given)" which will crash
-    # arv-mount.
-    # The workaround is to implement it with the proper number of parameters,
-    # and then everything works out.
-    def create(self, p1, p2, p3, p4, p5):
-        raise llfuse.FUSEError(errno.EROFS)
+    def _check_writable(self, inode_parent):
+        if not self.enable_write:
+            raise llfuse.FUSEError(errno.EROFS)
+
+        if inode_parent in self.inodes:
+            p = self.inodes[inode_parent]
+        else:
+            raise llfuse.FUSEError(errno.ENOENT)
+
+        if not isinstance(p, Directory):
+            raise llfuse.FUSEError(errno.ENOTDIR)
+
+        if not p.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        return p
+
+    @catch_exceptions
+    def create(self, inode_parent, name, mode, flags, ctx):
+        p = self._check_writable(inode_parent)
+        p.create(name)
+
+        # The file entry should have been implicitly created by callback.
+        f = p[name]
+        fh = next(self._filehandles_counter)
+        self._filehandles[fh] = FileHandle(fh, f)
+        self.inodes.touch(p)
+
+        f.inc_ref()
+        return (fh, self.getattr(f.inode))
+
+    @catch_exceptions
+    def mkdir(self, inode_parent, name, mode, ctx):
+        _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
+
+        p = self._check_writable(inode_parent)
+        p.mkdir(name)
+
+        # The dir entry should have been implicitly created by callback.
+        d = p[name]
+
+        d.inc_ref()
+        return self.getattr(d.inode)
+
+    @catch_exceptions
+    def unlink(self, inode_parent, name):
+        _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
+        p = self._check_writable(inode_parent)
+        p.unlink(name)
+
+    @catch_exceptions
+    def rmdir(self, inode_parent, name):
+        _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
+        p = self._check_writable(inode_parent)
+        p.rmdir(name)
+
+    @catch_exceptions
+    def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
+        _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
+        src = self._check_writable(inode_parent_old)
+        dest = self._check_writable(inode_parent_new)
+        dest.rename(name_old, name_new, src)
+
+    @catch_exceptions
+    def flush(self, fh):
+        if fh in self._filehandles:
+            self._filehandles[fh].flush()
+
+    def fsync(self, fh, datasync):
+        self.flush(fh)
+
+    def fsyncdir(self, fh, datasync):
+        self.flush(fh)