Declare serialize / accept json fields, start working on states
[arvados.git] / services / fuse / arvados_fuse / fusedir.py
index 1c9f0c21bd9becdc164c7c7c475129048bd006a1..a2135b3defc299e29b3f5a284f61653d24037fe7 100644 (file)
@@ -1,7 +1,20 @@
 import logging
 import re
-
-from fresh import FreshBase, convertTime
+import time
+import llfuse
+import arvados
+import apiclient
+import functools
+import threading
+from apiclient import errors as apiclient_errors
+import errno
+import time
+
+from fusefile import StringFile, ObjectFile, FuncToJSONFile, FuseArvadosFile
+from fresh import FreshBase, convertTime, use_counter, check_update
+
+import arvados.collection
+from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -11,6 +24,7 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 # appear as underscores in the fuse mount.)
 _disallowed_filename_characters = re.compile('[\x00/]')
 
+# '.' and '..' are not reachable if API server is newer than #6277
 def sanitize_filename(dirty):
     """Replace disallowed filename characters with harmless "_"."""
     if dirty is None:
@@ -32,19 +46,22 @@ class Directory(FreshBase):
     and the value referencing a File or Directory object.
     """
 
-    def __init__(self, parent_inode):
+    def __init__(self, parent_inode, inodes):
+        """parent_inode is the integer inode number"""
+
         super(Directory, self).__init__()
 
-        """parent_inode is the integer inode number"""
         self.inode = None
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
         self.parent_inode = parent_inode
+        self.inodes = inodes
         self._entries = {}
         self._mtime = time.time()
 
     #  Overriden by subclasses to implement logic to update the entries dict
     #  when the directory is stale
+    @use_counter
     def update(self):
         pass
 
@@ -53,29 +70,40 @@ class Directory(FreshBase):
     def size(self):
         return 0
 
+    def persisted(self):
+        return False
+
     def checkupdate(self):
         if self.stale():
             try:
                 self.update()
             except apiclient.errors.HttpError as e:
-                _logger.debug(e)
+                _logger.warn(e)
 
+    @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         return self._entries[item]
 
+    @use_counter
+    @check_update
     def items(self):
-        self.checkupdate()
-        return self._entries.items()
-
-    def __iter__(self):
-        self.checkupdate()
-        return self._entries.iterkeys()
+        return list(self._entries.items())
 
+    @use_counter
+    @check_update
     def __contains__(self, k):
-        self.checkupdate()
         return k in self._entries
 
+    @use_counter
+    @check_update
+    def __len__(self):
+        return len(self._entries)
+
+    def fresh(self):
+        self.inodes.touch(self)
+        super(Directory, self).fresh()
+
     def merge(self, items, fn, same, new_entry):
         """Helper method for updating the contents of the directory.
 
@@ -108,6 +136,7 @@ class Directory(FreshBase):
                     self._entries[name] = oldentries[name]
                     del oldentries[name]
                 else:
+                    _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
                     # create new directory entry
                     ent = new_entry(i)
                     if ent is not None:
@@ -116,51 +145,205 @@ class Directory(FreshBase):
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
-            llfuse.invalidate_entry(self.inode, str(i))
+            _logger.debug("Forgetting about entry '%s' on inode %i", i, self.inode)
+            self.inodes.invalidate_entry(self.inode, i.encode(self.inodes.encoding))
             self.inodes.del_entry(oldentries[i])
             changed = True
 
         if changed:
+            self.inodes.invalidate_inode(self.inode)
             self._mtime = time.time()
 
         self.fresh()
 
-    def clear(self):
+    def clear(self, force=False):
         """Delete all entries"""
-        oldentries = self._entries
-        self._entries = {}
-        for n in oldentries:
-            if isinstance(n, Directory):
-                n.clear()
-            llfuse.invalidate_entry(self.inode, str(n))
-            self.inodes.del_entry(oldentries[n])
-        llfuse.invalidate_inode(self.inode)
-        self.invalidate()
+
+        if not self.in_use() or force:
+            oldentries = self._entries
+            self._entries = {}
+            for n in oldentries:
+                if not oldentries[n].clear(force):
+                    self._entries = oldentries
+                    return False
+            for n in oldentries:
+                self.inodes.invalidate_entry(self.inode, n.encode(self.inodes.encoding))
+                self.inodes.del_entry(oldentries[n])
+            self.inodes.invalidate_inode(self.inode)
+            self.invalidate()
+            return True
+        else:
+            return False
 
     def mtime(self):
         return self._mtime
 
+    def writable(self):
+        return False
 
-class CollectionDirectory(Directory):
-    """Represents the root of a directory tree holding a collection."""
+    def flush(self):
+        pass
 
-    def __init__(self, parent_inode, inodes, api, num_retries, collection):
-        super(CollectionDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+    def create(self, name):
+        raise NotImplementedError()
+
+    def mkdir(self, name):
+        raise NotImplementedError()
+
+    def unlink(self, name):
+        raise NotImplementedError()
+
+    def rmdir(self, name):
+        raise NotImplementedError()
+
+    def rename(self, name_old, name_new, src):
+        raise NotImplementedError()
+
+
+class CollectionDirectoryBase(Directory):
+    """Represent an Arvados Collection as a directory.
+
+    This class is used for Subcollections, and is also the base class for
+    CollectionDirectory, which implements collection loading/saving on
+    Collection records.
+
+    Most operations act only the underlying Arvados `Collection` object.  The
+    `Collection` object signals via a notify callback to
+    `CollectionDirectoryBase.on_event` that an item was added, removed or
+    modified.  FUSE inodes and directory entries are created, deleted or
+    invalidated in response to these events.
+
+    """
+
+    def __init__(self, parent_inode, inodes, collection):
+        super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
+        self.collection = collection
+
+    def new_entry(self, name, item, mtime):
+        name = sanitize_filename(name)
+        if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+            if item.fuse_entry.dead is not True:
+                raise Exception("Can only reparent dead inode entry")
+            if item.fuse_entry.inode is None:
+                raise Exception("Reparented entry must still have valid inode")
+            item.fuse_entry.dead = False
+            self._entries[name] = item.fuse_entry
+        elif isinstance(item, arvados.collection.RichCollectionBase):
+            self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
+            self._entries[name].populate(mtime)
+        else:
+            self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
+        item.fuse_entry = self._entries[name]
+
+    def on_event(self, event, collection, name, item):
+        if collection == self.collection:
+            name = sanitize_filename(name)
+            _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
+            with llfuse.lock:
+                if event == arvados.collection.ADD:
+                    self.new_entry(name, item, self.mtime())
+                elif event == arvados.collection.DEL:
+                    ent = self._entries[name]
+                    del self._entries[name]
+                    self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
+                    self.inodes.del_entry(ent)
+                elif event == arvados.collection.MOD:
+                    if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+                        self.inodes.invalidate_inode(item.fuse_entry.inode)
+                    elif name in self._entries:
+                        self.inodes.invalidate_inode(self._entries[name].inode)
+
+    def populate(self, mtime):
+        self._mtime = mtime
+        self.collection.subscribe(self.on_event)
+        for entry, item in self.collection.items():
+            self.new_entry(entry, item, self.mtime())
+
+    def writable(self):
+        return self.collection.writable()
+
+    @use_counter
+    def flush(self):
+        with llfuse.lock_released:
+            self.collection.root_collection().save()
+
+    @use_counter
+    @check_update
+    def create(self, name):
+        with llfuse.lock_released:
+            self.collection.open(name, "w").close()
+
+    @use_counter
+    @check_update
+    def mkdir(self, name):
+        with llfuse.lock_released:
+            self.collection.mkdirs(name)
+
+    @use_counter
+    @check_update
+    def unlink(self, name):
+        with llfuse.lock_released:
+            self.collection.remove(name)
+        self.flush()
+
+    @use_counter
+    @check_update
+    def rmdir(self, name):
+        with llfuse.lock_released:
+            self.collection.remove(name)
+        self.flush()
+
+    @use_counter
+    @check_update
+    def rename(self, name_old, name_new, src):
+        if not isinstance(src, CollectionDirectoryBase):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            ent = src[name_old]
+            tgt = self[name_new]
+            if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
+                pass
+            elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
+                if len(tgt) > 0:
+                    raise llfuse.FUSEError(errno.ENOTEMPTY)
+            elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
+                raise llfuse.FUSEError(errno.ENOTDIR)
+            elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
+                raise llfuse.FUSEError(errno.EISDIR)
+
+        with llfuse.lock_released:
+            self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+        self.flush()
+        src.flush()
+
+
+class CollectionDirectory(CollectionDirectoryBase):
+    """Represents the root of a directory tree representing a collection."""
+
+    def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
+        super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
         self.api = api
         self.num_retries = num_retries
-        self.collection_object_file = None
-        self.collection_object = None
-        if isinstance(collection, dict):
-            self.collection_locator = collection['uuid']
-            self._mtime = convertTime(collection.get('modified_at'))
+        self.collection_record_file = None
+        self.collection_record = None
+        if isinstance(collection_record, dict):
+            self.collection_locator = collection_record['uuid']
+            self._mtime = convertTime(collection_record.get('modified_at'))
         else:
-            self.collection_locator = collection
+            self.collection_locator = collection_record
             self._mtime = 0
+        self._manifest_size = 0
+        if self.collection_locator:
+            self._writable = (uuid_pattern.match(self.collection_locator) is not None)
+        self._updating_lock = threading.Lock()
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
 
+    def writable(self):
+        return self.collection.writable() if self.collection is not None else self._writable
+
     # Used by arv-web.py to switch the contents of the CollectionDirectory
     def change_collection(self, new_locator):
         """Switch the contents of the CollectionDirectory.
@@ -169,78 +352,99 @@ class CollectionDirectory(Directory):
         """
 
         self.collection_locator = new_locator
-        self.collection_object = None
+        self.collection_record = None
         self.update()
 
-    def new_collection(self, new_collection_object, coll_reader):
-        self.collection_object = new_collection_object
+    def new_collection(self, new_collection_record, coll_reader):
+        if self.inode:
+            self.clear(force=True)
 
-        self._mtime = convertTime(self.collection_object.get('modified_at'))
+        self.collection_record = new_collection_record
 
-        if self.collection_object_file is not None:
-            self.collection_object_file.update(self.collection_object)
+        if self.collection_record:
+            self._mtime = convertTime(self.collection_record.get('modified_at'))
+            self.collection_locator = self.collection_record["uuid"]
+            if self.collection_record_file is not None:
+                self.collection_record_file.update(self.collection_record)
 
-        self.clear()
-        for s in coll_reader.all_streams():
-            cwd = self
-            for part in s.name().split('/'):
-                if part != '' and part != '.':
-                    partname = sanitize_filename(part)
-                    if partname not in cwd._entries:
-                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode))
-                    cwd = cwd._entries[partname]
-            for k, v in s.files().items():
-                cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
+        self.collection = coll_reader
+        self.populate(self.mtime())
 
-    def update(self):
+    def uuid(self):
+        return self.collection_locator
+
+    @use_counter
+    def update(self, to_record_version=None):
         try:
-            if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
+            if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
 
             if self.collection_locator is None:
                 self.fresh()
                 return True
 
-            with llfuse.lock_released:
-                coll_reader = arvados.CollectionReader(
-                    self.collection_locator, self.api, self.api.keep,
-                    num_retries=self.num_retries)
-                new_collection_object = coll_reader.api_response() or {}
-                # If the Collection only exists in Keep, there will be no API
-                # response.  Fill in the fields we need.
-                if 'uuid' not in new_collection_object:
-                    new_collection_object['uuid'] = self.collection_locator
-                if "portable_data_hash" not in new_collection_object:
-                    new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
-                if 'manifest_text' not in new_collection_object:
-                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
-                coll_reader.normalize()
-            # end with llfuse.lock_released, re-acquire lock
-
-            if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
-                self.new_collection(new_collection_object, coll_reader)
+            try:
+                with llfuse.lock_released:
+                    self._updating_lock.acquire()
+                    if not self.stale():
+                        return
+
+                    _logger.debug("Updating %s", to_record_version)
+                    if self.collection is not None:
+                        if self.collection.known_past_version(to_record_version):
+                            _logger.debug("%s already processed %s", self.collection_locator, to_record_version)
+                        else:
+                            self.collection.update()
+                    else:
+                        if uuid_pattern.match(self.collection_locator):
+                            coll_reader = arvados.collection.Collection(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        else:
+                            coll_reader = arvados.collection.CollectionReader(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        new_collection_record = coll_reader.api_response() or {}
+                        # If the Collection only exists in Keep, there will be no API
+                        # response.  Fill in the fields we need.
+                        if 'uuid' not in new_collection_record:
+                            new_collection_record['uuid'] = self.collection_locator
+                        if "portable_data_hash" not in new_collection_record:
+                            new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+                        if 'manifest_text' not in new_collection_record:
+                            new_collection_record['manifest_text'] = coll_reader.manifest_text()
+
+                        if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
+                            self.new_collection(new_collection_record, coll_reader)
+
+                        self._manifest_size = len(coll_reader.manifest_text())
+                        _logger.debug("%s manifest_size %i", self, self._manifest_size)
+                # end with llfuse.lock_released, re-acquire lock
 
-            self.fresh()
-            return True
-        except arvados.errors.NotFoundError:
-            _logger.exception("arv-mount %s: error", self.collection_locator)
+                self.fresh()
+                return True
+            finally:
+                self._updating_lock.release()
+        except arvados.errors.NotFoundError as e:
+            _logger.error("Error fetching collection '%s': %s", self.collection_locator, e)
         except arvados.errors.ArgumentError as detail:
             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         except Exception:
             _logger.exception("arv-mount %s: error", self.collection_locator)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         return False
 
+    @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         if item == '.arvados#collection':
-            if self.collection_object_file is None:
-                self.collection_object_file = ObjectFile(self.inode, self.collection_object)
-                self.inodes.add_entry(self.collection_object_file)
-            return self.collection_object_file
+            if self.collection_record_file is None:
+                self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+                self.inodes.add_entry(self.collection_record_file)
+            return self.collection_record_file
         else:
             return super(CollectionDirectory, self).__getitem__(item)
 
@@ -250,6 +454,95 @@ class CollectionDirectory(Directory):
         else:
             return super(CollectionDirectory, self).__contains__(k)
 
+    def invalidate(self):
+        self.collection_record = None
+        self.collection_record_file = None
+        super(CollectionDirectory, self).invalidate()
+
+    def persisted(self):
+        return (self.collection_locator is not None)
+
+    def objsize(self):
+        # This is an empirically-derived heuristic to estimate the memory used
+        # to store this collection's metadata.  Calculating the memory
+        # footprint directly would be more accurate, but also more complicated.
+        return self._manifest_size * 128
+
+    def finalize(self):
+        if self.collection is not None:
+            if self.writable():
+                self.collection.save()
+            self.collection.stop_threads()
+
+
+class TmpCollectionDirectory(CollectionDirectoryBase):
+    """A directory backed by an Arvados collection that never gets saved.
+
+    This supports using Keep as scratch space. A userspace program can
+    read the .arvados#collection file to get a current manifest in
+    order to save a snapshot of the scratch data or use it as a crunch
+    job output.
+    """
+
+    class UnsaveableCollection(arvados.collection.Collection):
+        def save(self):
+            pass
+        def save_new(self):
+            pass
+
+    def __init__(self, parent_inode, inodes, api_client, num_retries):
+        collection = self.UnsaveableCollection(
+            api_client=api_client,
+            keep_client=api_client.keep)
+        super(TmpCollectionDirectory, self).__init__(
+            parent_inode, inodes, collection)
+        self.collection_record_file = None
+        self.populate(self.mtime())
+
+    def on_event(self, *args, **kwargs):
+        super(TmpCollectionDirectory, self).on_event(*args, **kwargs)
+        if self.collection_record_file:
+            with llfuse.lock:
+                self.collection_record_file.invalidate()
+            self.inodes.invalidate_inode(self.collection_record_file.inode)
+            _logger.debug("%s invalidated collection record", self)
+
+    def collection_record(self):
+        with llfuse.lock_released:
+            return {
+                "uuid": None,
+                "manifest_text": self.collection.manifest_text(),
+                "portable_data_hash": self.collection.portable_data_hash(),
+            }
+
+    def __contains__(self, k):
+        return (k == '.arvados#collection' or
+                super(TmpCollectionDirectory, self).__contains__(k))
+
+    @use_counter
+    def __getitem__(self, item):
+        if item == '.arvados#collection':
+            if self.collection_record_file is None:
+                self.collection_record_file = FuncToJSONFile(
+                    self.inode, self.collection_record)
+                self.inodes.add_entry(self.collection_record_file)
+            return self.collection_record_file
+        return super(TmpCollectionDirectory, self).__getitem__(item)
+
+    def persisted(self):
+        return False
+
+    def writable(self):
+        return True
+
+    def finalize(self):
+        self.collection.stop_threads()
+
+    def invalidate(self):
+        if self.collection_record_file:
+            self.collection_record_file.invalidate()
+        super(TmpCollectionDirectory, self).invalidate()
+
 
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
@@ -265,19 +558,20 @@ class MagicDirectory(Directory):
     README_TEXT = """
 This directory provides access to Arvados collections as subdirectories listed
 by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
-the form '1234567890abcdefghijklmnopqrstuv+123').
+the form '1234567890abcdef0123456789abcdef+123').
 
 Note that this directory will appear empty until you attempt to access a
 specific collection subdirectory (such as trying to 'cd' into it), at which
 point the collection will actually be looked up on the server and the directory
 will appear if it exists.
+
 """.lstrip()
 
-    def __init__(self, parent_inode, inodes, api, num_retries):
-        super(MagicDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+    def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False):
+        super(MagicDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
+        self.pdh_only = pdh_only
 
     def __setattr__(self, name, value):
         super(MagicDirectory, self).__setattr__(name, value)
@@ -289,25 +583,31 @@ will appear if it exists.
             # If we're the root directory, add an identical by_id subdirectory.
             if self.inode == llfuse.ROOT_INODE:
                 self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
-                        self.inode, self.inodes, self.api, self.num_retries))
+                        self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
 
     def __contains__(self, k):
         if k in self._entries:
             return True
 
-        if not portable_data_hash_pattern.match(k) and not uuid_pattern.match(k):
+        if not portable_data_hash_pattern.match(k) and (self.pdh_only or not uuid_pattern.match(k)):
             return False
 
         try:
             e = self.inodes.add_entry(CollectionDirectory(
                     self.inode, self.inodes, self.api, self.num_retries, k))
+
             if e.update():
-                self._entries[k] = e
+                if k not in self._entries:
+                    self._entries[k] = e
+                else:
+                    self.inodes.del_entry(e)
                 return True
             else:
+                self.inodes.del_entry(e)
                 return False
         except Exception as e:
             _logger.debug('arv-mount exception keep %s', e)
+            self.inodes.del_entry(e)
             return False
 
     def __getitem__(self, item):
@@ -316,33 +616,31 @@ will appear if it exists.
         else:
             raise KeyError("No collection with id " + item)
 
+    def clear(self, force=False):
+        pass
+
 
 class RecursiveInvalidateDirectory(Directory):
     def invalidate(self):
-        if self.inode == llfuse.ROOT_INODE:
-            llfuse.lock.acquire()
         try:
             super(RecursiveInvalidateDirectory, self).invalidate()
             for a in self._entries:
                 self._entries[a].invalidate()
         except Exception:
             _logger.exception()
-        finally:
-            if self.inode == llfuse.ROOT_INODE:
-                llfuse.lock.release()
 
 
 class TagsDirectory(RecursiveInvalidateDirectory):
     """A special directory that contains as subdirectories all tags visible to the user."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
-        super(TagsDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagsDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self._poll = True
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             tags = self.api.links().list(
@@ -363,14 +661,14 @@ class TagDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, tag,
                  poll=False, poll_time=60):
-        super(TagDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(TagDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.tag = tag
         self._poll = poll
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             taggedcollections = self.api.links().list(
@@ -390,15 +688,16 @@ class ProjectDirectory(Directory):
 
     def __init__(self, parent_inode, inodes, api, num_retries, project_object,
                  poll=False, poll_time=60):
-        super(ProjectDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(ProjectDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.project_object = project_object
         self.project_object_file = None
-        self.uuid = project_object['uuid']
+        self.project_uuid = project_object['uuid']
         self._poll = poll
         self._poll_time = poll_time
+        self._updating_lock = threading.Lock()
+        self._current_user = None
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
@@ -415,6 +714,10 @@ class ProjectDirectory(Directory):
         else:
             return None
 
+    def uuid(self):
+        return self.project_uuid
+
+    @use_counter
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
@@ -437,39 +740,40 @@ class ProjectDirectory(Directory):
                 return None
 
         def samefn(a, i):
-            if isinstance(a, CollectionDirectory):
-                return a.collection_locator == i['uuid']
-            elif isinstance(a, ProjectDirectory):
-                return a.uuid == i['uuid']
+            if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
+                return a.uuid() == i['uuid']
             elif isinstance(a, ObjectFile):
-                return a.uuid == i['uuid'] and not a.stale()
+                return a.uuid() == i['uuid'] and not a.stale()
             return False
 
-        with llfuse.lock_released:
-            if group_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.groups().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
-            elif user_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.users().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
-
-            contents = arvados.util.list_all(self.api.groups().contents,
-                                             self.num_retries, uuid=self.uuid)
-            # Name links will be obsolete soon, take this out when there are no more pre-#3036 in use.
-            contents += arvados.util.list_all(
-                self.api.links().list, self.num_retries,
-                filters=[['tail_uuid', '=', self.uuid],
-                         ['link_class', '=', 'name']])
+        try:
+            with llfuse.lock_released:
+                self._updating_lock.acquire()
+                if not self.stale():
+                    return
 
-        # end with llfuse.lock_released, re-acquire lock
+                if group_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.groups().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
+                elif user_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.users().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
+
+                contents = arvados.util.list_all(self.api.groups().contents,
+                                                 self.num_retries, uuid=self.project_uuid)
 
-        self.merge(contents,
-                   namefn,
-                   samefn,
-                   self.createDirectory)
+            # end with llfuse.lock_released, re-acquire lock
 
+            self.merge(contents,
+                       namefn,
+                       samefn,
+                       self.createDirectory)
+        finally:
+            self._updating_lock.release()
+
+    @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         if item == '.arvados#project':
             return self.project_object_file
         else:
@@ -481,20 +785,84 @@ class ProjectDirectory(Directory):
         else:
             return super(ProjectDirectory, self).__contains__(k)
 
+    @use_counter
+    @check_update
+    def writable(self):
+        with llfuse.lock_released:
+            if not self._current_user:
+                self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
+            return self._current_user["uuid"] in self.project_object["writable_by"]
+
+    def persisted(self):
+        return True
+
+    @use_counter
+    @check_update
+    def mkdir(self, name):
+        try:
+            with llfuse.lock_released:
+                self.api.collections().create(body={"owner_uuid": self.project_uuid,
+                                                    "name": name,
+                                                    "manifest_text": ""}).execute(num_retries=self.num_retries)
+            self.invalidate()
+        except apiclient_errors.Error as error:
+            _logger.error(error)
+            raise llfuse.FUSEError(errno.EEXIST)
+
+    @use_counter
+    @check_update
+    def rmdir(self, name):
+        if name not in self:
+            raise llfuse.FUSEError(errno.ENOENT)
+        if not isinstance(self[name], CollectionDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+        if len(self[name]) > 0:
+            raise llfuse.FUSEError(errno.ENOTEMPTY)
+        with llfuse.lock_released:
+            self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
+        self.invalidate()
+
+    @use_counter
+    @check_update
+    def rename(self, name_old, name_new, src):
+        if not isinstance(src, ProjectDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        ent = src[name_old]
+
+        if not isinstance(ent, CollectionDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            # POSIX semantics for replacing one directory with another is
+            # tricky (the target directory must be empty, the operation must be
+            # atomic which isn't possible with the Arvados API as of this
+            # writing) so don't support that.
+            raise llfuse.FUSEError(errno.EPERM)
+
+        self.api.collections().update(uuid=ent.uuid(),
+                                      body={"owner_uuid": self.uuid(),
+                                            "name": name_new}).execute(num_retries=self.num_retries)
+
+        # Acually move the entry from source directory to this directory.
+        del src._entries[name_old]
+        self._entries[name_new] = ent
+        self.inodes.invalidate_entry(src.inode, name_old.encode(self.inodes.encoding))
+
 
 class SharedDirectory(Directory):
     """A special directory that represents users or groups who have shared projects with me."""
 
     def __init__(self, parent_inode, inodes, api, num_retries, exclude,
                  poll=False, poll_time=60):
-        super(SharedDirectory, self).__init__(parent_inode)
-        self.inodes = inodes
+        super(SharedDirectory, self).__init__(parent_inode, inodes)
         self.api = api
         self.num_retries = num_retries
         self.current_user = api.users().current().execute(num_retries=num_retries)
         self._poll = True
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             all_projects = arvados.util.list_all(
@@ -530,11 +898,14 @@ class SharedDirectory(Directory):
             for r in root_owners:
                 if r in objects:
                     obr = objects[r]
-                    if "name" in obr:
+                    if obr.get("name"):
                         contents[obr["name"]] = obr
-                    if "first_name" in obr:
+                    #elif obr.get("username"):
+                    #    contents[obr["username"]] = obr
+                    elif "first_name" in obr:
                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
 
+
             for r in roots:
                 if r['owner_uuid'] not in objects:
                     contents[r['name']] = r
@@ -544,7 +915,7 @@ class SharedDirectory(Directory):
         try:
             self.merge(contents.items(),
                        lambda i: i[0],
-                       lambda a, i: a.uuid == i[1]['uuid'],
+                       lambda a, i: a.uuid() == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()