Merge branch 'master' into 3198-writable-fuse
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 12 May 2015 17:05:38 +0000 (13:05 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 12 May 2015 17:05:38 +0000 (13:05 -0400)
Conflicts:
sdk/python/arvados/arvfile.py
sdk/python/arvados/keep.py
services/fuse/arvados_fuse/__init__.py

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py
services/fuse/setup.py
services/fuse/tests/test_inodes.py
services/fuse/tests/test_mount.py

index 2e0eaa8ce0f645ca305826851c5eb7ddccb434e2..c086e5fd3a5b51550f46dce842e3814bb987457e 100644 (file)
@@ -9,6 +9,7 @@ import Queue
 import copy
 import errno
 import re
+import logging
 
 from .errors import KeepWriteError, AssertionError
 from .keep import KeepLocator
@@ -16,6 +17,10 @@ from ._normalize_stream import normalize_stream
 from ._ranges import locators_and_ranges, replace_range, Range
 from .retry import retry_method
 
+MOD = "mod"
+
+_logger = logging.getLogger('arvados.arvfile')
+
 def split(path):
     """split(path) -> streamname, filename
 
@@ -349,7 +354,7 @@ def must_be_writable(orig_func):
     @functools.wraps(orig_func)
     def must_be_writable_wrapper(self, *args, **kwargs):
         if not self.writable():
-            raise IOError((errno.EROFS, "Collection must be writable."))
+            raise IOError(errno.EROFS, "Collection must be writable.")
         return orig_func(self, *args, **kwargs)
     return must_be_writable_wrapper
 
@@ -436,11 +441,17 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self._prefetch_queue = None
 
-    def commit_bufferblock(self, block):
+    def commit_bufferblock(self, block, wait):
         """Initiate a background upload of a bufferblock.
 
-        This will block if the upload queue is at capacity, otherwise it will
-        return immediately.
+        :block:
+          The block object to upload
+
+        :wait:
+          If `wait` is True, upload the block synchronously.
+          If `wait` is False, upload the block asynchronously.  This will
+          return immediately unless if the upload queue is at capacity, in
+          which case it will wait on an upload queue slot.
 
         """
 
@@ -452,6 +463,7 @@ class _BlockManager(object):
                     bufferblock = self._put_queue.get()
                     if bufferblock is None:
                         return
+
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
@@ -461,30 +473,37 @@ class _BlockManager(object):
                     if self._put_queue is not None:
                         self._put_queue.task_done()
 
-        with self.lock:
-            if self._put_threads is None:
-                # Start uploader threads.
-
-                # If we don't limit the Queue size, the upload queue can quickly
-                # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
-                # servers.
-                #
-                # With two upload threads and a queue size of 2, this means up to 4
-                # blocks pending.  If they are full 64 MiB blocks, that means up to
-                # 256 MiB of internal buffering, which is the same size as the
-                # default download block cache in KeepClient.
-                self._put_queue = Queue.Queue(maxsize=2)
-                self._put_errors = Queue.Queue()
-
-                self._put_threads = []
-                for i in xrange(0, self.num_put_threads):
-                    thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
-                    self._put_threads.append(thread)
-                    thread.daemon = True
-                    thread.start()
+        if block.state() != _BufferBlock.WRITABLE:
+            return
+
+        if wait:
+            block.set_state(_BufferBlock.PENDING)
+            loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+            block.set_state(_BufferBlock.COMMITTED, loc)
+        else:
+            with self.lock:
+                if self._put_threads is None:
+                    # Start uploader threads.
+
+                    # If we don't limit the Queue size, the upload queue can quickly
+                    # grow to take up gigabytes of RAM if the writing process is
+                    # generating data more quickly than it can be send to the Keep
+                    # servers.
+                    #
+                    # With two upload threads and a queue size of 2, this means up to 4
+                    # blocks pending.  If they are full 64 MiB blocks, that means up to
+                    # 256 MiB of internal buffering, which is the same size as the
+                    # default download block cache in KeepClient.
+                    self._put_queue = Queue.Queue(maxsize=2)
+                    self._put_errors = Queue.Queue()
+
+                    self._put_threads = []
+                    for i in xrange(0, self.num_put_threads):
+                        thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+                        self._put_threads.append(thread)
+                        thread.daemon = True
+                        thread.start()
 
-        if block.state() == _BufferBlock.WRITABLE:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
             self._put_queue.put(block)
@@ -524,7 +543,8 @@ class _BlockManager(object):
             items = self._bufferblocks.items()
 
         for k,v in items:
-            v.owner.flush()
+            if v.state() == _BufferBlock.WRITABLE:
+                v.owner.flush(False)
 
         with self.lock:
             if self._put_queue is not None:
@@ -588,7 +608,7 @@ class ArvadosFile(object):
 
     """
 
-    def __init__(self, parent, stream=[], segments=[]):
+    def __init__(self, parent, name, stream=[], segments=[]):
         """
         ArvadosFile constructor.
 
@@ -599,6 +619,7 @@ class ArvadosFile(object):
           a list of Range objects representing segments
         """
         self.parent = parent
+        self.name = name
         self._modified = True
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -614,9 +635,9 @@ class ArvadosFile(object):
         return copy.copy(self._segments)
 
     @synchronized
-    def clone(self, new_parent):
+    def clone(self, new_parent, new_name):
         """Make a copy of this file."""
-        cp = ArvadosFile(new_parent)
+        cp = ArvadosFile(new_parent, new_name)
         cp.replace_contents(self)
         return cp
 
@@ -713,8 +734,7 @@ class ArvadosFile(object):
             self._segments = new_segs
             self._modified = True
         elif size > self.size():
-            raise IOError("truncate() does not support extending the file size")
-
+            raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
 
     def readfrom(self, offset, size, num_retries, exact=False):
         """Read up to `size` bytes from the file starting at `offset`.
@@ -795,18 +815,24 @@ class ArvadosFile(object):
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
             self._repack_writes(num_retries)
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         self._current_bblock.append(data)
 
         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
 
+        self.parent.notify(MOD, self.parent, self.name, (self, self))
+
+        return len(data)
+
     @synchronized
-    def flush(self, num_retries=0):
-        if self._current_bblock:
-            self._repack_writes(num_retries)
-            self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+    def flush(self, wait=True, num_retries=0):
+        if self.modified():
+            if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
+                self._repack_writes(num_retries)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
+            self.parent.notify(MOD, self.parent, self.name, (self, self))
 
     @must_be_writable
     @synchronized
@@ -852,6 +878,16 @@ class ArvadosFile(object):
         buf += "\n"
         return buf
 
+    @must_be_writable
+    @synchronized
+    def reparent(self, newparent, newname):
+        self.flush()
+        self.parent.remove(self.name)
+
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+        self._modified = True
 
 class ArvadosFileReader(ArvadosFileReaderBase):
     """Wraps ArvadosFile in a file-like object supporting reading only.
@@ -861,8 +897,8 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
     """
 
-    def __init__(self, arvadosfile, name, mode="r", num_retries=None):
-        super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
+    def __init__(self, arvadosfile,  mode="r", num_retries=None):
+        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
         self.arvadosfile = arvadosfile
 
     def size(self):
@@ -873,16 +909,32 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
     @_FileLikeObjectBase._before_close
     @retry_method
-    def read(self, size, num_retries=None):
-        """Read up to `size` bytes from the stream, starting at the current file position."""
-        data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
-        self._filepos += len(data)
-        return data
+    def read(self, size=None, num_retries=None):
+        """Read up to `size` bytes from the file and return the result.
+
+        Starts at the current file position.  If `size` is None, read the
+        entire remainder of the file.
+        """
+        if size is None:
+            data = []
+            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+            while rd:
+                data.append(rd)
+                self._filepos += len(rd)
+                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+            return ''.join(data)
+        else:
+            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
+            self._filepos += len(data)
+            return data
 
     @_FileLikeObjectBase._before_close
     @retry_method
     def readfrom(self, offset, size, num_retries=None):
-        """Read up to `size` bytes from the stream, starting at the current file position."""
+        """Read up to `size` bytes from the stream, starting at the specified file offset.
+
+        This method does not change the file position.
+        """
         return self.arvadosfile.readfrom(offset, size, num_retries)
 
     def flush(self):
@@ -897,8 +949,8 @@ class ArvadosFileWriter(ArvadosFileReader):
 
     """
 
-    def __init__(self, arvadosfile, name, mode, num_retries=None):
-        super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
+    def __init__(self, arvadosfile, mode, num_retries=None):
+        super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -908,6 +960,7 @@ class ArvadosFileWriter(ArvadosFileReader):
         else:
             self.arvadosfile.writeto(self._filepos, data, num_retries)
             self._filepos += len(data)
+        return len(data)
 
     @_FileLikeObjectBase._before_close
     @retry_method
index 30828732d8d4908ca0922bc780c11d2a6943578e..89cbda937c59fbb5064d9c81644b07dba623f806 100644 (file)
@@ -488,6 +488,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._modified = True
+        self._callback = None
         self._items = {}
 
     def _my_api(self):
@@ -537,9 +538,9 @@ class RichCollectionBase(CollectionBase):
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
-                        item = Subcollection(self)
+                        item = Subcollection(self, pathcomponents[0])
                     else:
-                        item = ArvadosFile(self)
+                        item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
@@ -547,14 +548,14 @@ class RichCollectionBase(CollectionBase):
             else:
                 if item is None:
                     # create new collection
-                    item = Subcollection(self)
+                    item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                    raise IOError(errno.ENOTDIR, "Interior path components must be subcollection")
         else:
             return self
 
@@ -580,9 +581,9 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
-                raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                raise IOError(errno.ENOTDIR, "Interior path components must be subcollection")
 
-    def mkdirs(path):
+    def mkdirs(self, path):
         """Recursive subcollection create.
 
         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
@@ -615,7 +616,7 @@ class RichCollectionBase(CollectionBase):
         create = (mode != "r")
 
         if create and not self.writable():
-            raise IOError((errno.EROFS, "Collection is read only"))
+            raise IOError(errno.EROFS, "Collection is read only")
 
         if create:
             arvfile = self.find_or_create(path, FILE)
@@ -623,9 +624,9 @@ class RichCollectionBase(CollectionBase):
             arvfile = self.find(path)
 
         if arvfile is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found")
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError((errno.EISDIR, "Path must refer to a file."))
+            raise IOError(errno.EISDIR, "Path must refer to a file.")
 
         if mode[0] == "w":
             arvfile.truncate(0)
@@ -633,9 +634,9 @@ class RichCollectionBase(CollectionBase):
         name = os.path.basename(path)
 
         if mode == "r":
-            return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
         else:
-            return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
 
     @synchronized
     def modified(self):
@@ -720,10 +721,10 @@ class RichCollectionBase(CollectionBase):
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found")
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
+                raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             self._modified = True
@@ -733,15 +734,15 @@ class RichCollectionBase(CollectionBase):
 
     def _clonefrom(self, source):
         for k,v in source.items():
-            self._items[k] = v.clone(self)
+            self._items[k] = v.clone(self, k)
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
-    def add(self, source_obj, target_name, overwrite=False):
-        """Copy a file or subcollection to this collection.
+    def add(self, source_obj, target_name, overwrite=False, reparent=False):
+        """Copy or move a file or subcollection to this collection.
 
         :source_obj:
           An ArvadosFile, or Subcollection object
@@ -753,24 +754,74 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
 
+        :reparent:
+          If True, source_obj will be moved from its parent collection to this collection.
+          If False, source_obj will be copied and the parent collection will be
+          unmodified.
+
         """
 
         if target_name in self and not overwrite:
-            raise IOError((errno.EEXIST, "File already exists"))
+            raise IOError(errno.EEXIST, "File already exists")
 
         modified_from = None
         if target_name in self:
             modified_from = self[target_name]
 
-        # Actually make the copy.
-        dup = source_obj.clone(self)
-        self._items[target_name] = dup
+        # Actually make the move or copy.
+        if reparent:
+            source_obj.reparent(self, target_name)
+            item = source_obj
+        else:
+            item = source_obj.clone(self, target_name)
+
+        self._items[target_name] = item
         self._modified = True
 
         if modified_from:
-            self.notify(MOD, self, target_name, (modified_from, dup))
+            self.notify(MOD, self, target_name, (modified_from, item))
+        else:
+            self.notify(ADD, self, target_name, item)
+
+    def _get_src_target(self, source, target_path, source_collection, create_dest):
+        if source_collection is None:
+            source_collection = self
+
+        # Find the object
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError(errno.ENOENT, "File not found")
+            sourcecomponents = source.split("/")
+        else:
+            source_obj = source
+            sourcecomponents = None
+
+        # Find parent collection the target path
+        targetcomponents = target_path.split("/")
+
+        # Determine the name to use.
+        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+
+        if create_dest:
+            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
         else:
-            self.notify(ADD, self, target_name, dup)
+            if len(targetcomponents) > 1:
+                target_dir = self.find("/".join(targetcomponents[0:-1]))
+            else:
+                target_dir = self
+
+        if target_dir is None:
+            raise IOError(errno.ENOENT, "Target directory not found.")
+
+        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
+
+        return (source_obj, target_dir, target_name)
 
     @must_be_writable
     @synchronized
@@ -792,35 +843,35 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
         """
-        if source_collection is None:
-            source_collection = self
 
-        # Find the object to copy
-        if isinstance(source, basestring):
-            source_obj = source_collection.find(source)
-            if source_obj is None:
-                raise IOError((errno.ENOENT, "File not found"))
-            sourcecomponents = source.split("/")
-        else:
-            source_obj = source
-            sourcecomponents = None
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+        target_dir.add(source_obj, target_name, overwrite, False)
 
-        # Find parent collection the target path
-        targetcomponents = target_path.split("/")
+    @must_be_writable
+    @synchronized
+    def rename(self, source, target_path, source_collection=None, overwrite=False):
+        """Move a file or subcollection from `source_collection` to a new path in this collection.
 
-        # Determine the name to use.
-        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+        :source:
+          A string with a path to source file or subcollection.
 
-        if not target_name:
-            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
 
-        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
-            target_dir = target_dir[target_name]
-            target_name = sourcecomponents[-1]
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
 
-        target_dir.add(source_obj, target_name, overwrite)
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+        if not source_obj.writable():
+            raise IOError(errno.EROFS, "Source collection must be writable.")
+        target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
         """Get the manifest text for this collection, sub collections and files.
@@ -917,15 +968,15 @@ class RichCollectionBase(CollectionBase):
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
-               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
-                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
-                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
 
     @must_be_writable
@@ -980,6 +1031,24 @@ class RichCollectionBase(CollectionBase):
         stripped = self.portable_manifest_text()
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
+    @synchronized
+    def subscribe(self, callback):
+        if self._callback is None:
+            self._callback = callback
+        else:
+            raise errors.ArgumentError("A callback is already set on this collection.")
+
+    @synchronized
+    def unsubscribe(self):
+        if self._callback is not None:
+            self._callback = None
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+        self.root_collection().notify(event, collection, name, item)
+
     @synchronized
     def __eq__(self, other):
         if other is self:
@@ -1082,7 +1151,6 @@ class Collection(RichCollectionBase):
         self._api_response = None
 
         self.lock = threading.RLock()
-        self.callbacks = []
         self.events = None
 
         if manifest_locator_or_text:
@@ -1215,7 +1283,7 @@ class Collection(RichCollectionBase):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
-        if exc_type is not None:
+        if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
         if self._block_manager is not None:
@@ -1236,7 +1304,7 @@ class Collection(RichCollectionBase):
         return self._manifest_locator
 
     @synchronized
-    def clone(self, new_parent=None, readonly=False, new_config=None):
+    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
         if new_config is None:
             new_config = self._config
         if readonly:
@@ -1362,6 +1430,7 @@ class Collection(RichCollectionBase):
         if create_collection_record:
             if name is None:
                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                ensure_unique_name = True
 
             body = {"manifest_text": text,
                     "name": name}
@@ -1378,19 +1447,6 @@ class Collection(RichCollectionBase):
 
         return text
 
-    @synchronized
-    def subscribe(self, callback):
-        self.callbacks.append(callback)
-
-    @synchronized
-    def unsubscribe(self, callback):
-        self.callbacks.remove(callback)
-
-    @synchronized
-    def notify(self, event, collection, name, item):
-        for c in self.callbacks:
-            c(event, collection, name, item)
-
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1420,6 +1476,7 @@ class Collection(RichCollectionBase):
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
+                self.mkdirs(stream_name)
                 continue
 
             if state == BLOCKS:
@@ -1453,6 +1510,11 @@ class Collection(RichCollectionBase):
 
         self.set_unmodified()
 
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+
 
 class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
@@ -1462,10 +1524,12 @@ class Subcollection(RichCollectionBase):
 
     """
 
-    def __init__(self, parent):
+    def __init__(self, parent, name):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
+        self.name = name
+        self.num_retries = parent.num_retries
 
     def root_collection(self):
         return self.parent.root_collection()
@@ -1482,18 +1546,12 @@ class Subcollection(RichCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def notify(self, event, collection, name, item):
-        return self.root_collection().notify(event, collection, name, item)
-
     def stream_name(self):
-        for k, v in self.parent.items():
-            if v is self:
-                return os.path.join(self.parent.stream_name(), k)
-        return '.'
+        return os.path.join(self.parent.stream_name(), self.name)
 
     @synchronized
-    def clone(self, new_parent):
-        c = Subcollection(new_parent)
+    def clone(self, new_parent, new_name):
+        c = Subcollection(new_parent, new_name)
         c._clonefrom(self)
         return c
 
index 1701aa43caae5baaa3f25522230c5b9c4ac46994..c5dbb1692aee231a8336f2f3b423e6d1e25bc2f5 100644 (file)
@@ -437,8 +437,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
             blocks[loc] = d
             stream.append(Range(loc, n, len(d)))
             n += len(d)
-        af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
-        return ArvadosFileReader(af, "count.txt")
+        af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), "count.txt", stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
+        return ArvadosFileReader(af)
 
     def test_read_block_crossing_behavior(self):
         # read() needs to return all the data requested if possible, even if it
@@ -477,8 +477,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
     def test__eq__from_writes(self):
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
             with Collection() as c2:
-                with c2.open("count1.txt", "w") as f:
-                    f.write("0123456789")
+                f = c2.open("count1.txt", "w")
+                f.write("0123456789")
 
                 self.assertTrue(c1["count1.txt"] == c2["count1.txt"])
                 self.assertFalse(c1["count1.txt"] != c2["count1.txt"])
@@ -486,8 +486,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
     def test__ne__(self):
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
             with Collection() as c2:
-                with c2.open("count1.txt", "w") as f:
-                    f.write("1234567890")
+                f = c2.open("count1.txt", "w")
+                f.write("1234567890")
 
                 self.assertTrue(c1["count1.txt"] != c2["count1.txt"])
                 self.assertFalse(c1["count1.txt"] == c2["count1.txt"])
@@ -510,10 +510,10 @@ class ArvadosFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
         blockmanager = arvados.arvfile._BlockManager(self.keep_client())
         blockmanager.prefetch_enabled = False
         col = Collection(keep_client=self.keep_client(), block_manager=blockmanager)
-        af = ArvadosFile(col,
+        af = ArvadosFile(col, "test",
                          stream=stream,
                          segments=segments)
-        return ArvadosFileReader(af, "test", **kwargs)
+        return ArvadosFileReader(af, **kwargs)
 
     def read_for_test(self, reader, byte_count, **kwargs):
         return reader.read(byte_count, **kwargs)
@@ -597,7 +597,7 @@ class BlockManagerTest(unittest.TestCase):
         blockmanager = arvados.arvfile._BlockManager(mockkeep)
         bufferblock = blockmanager.alloc_bufferblock()
         bufferblock.owner = mock.MagicMock()
-        bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+        bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
         bufferblock.append("foo")
         blockmanager.commit_all()
         self.assertTrue(bufferblock.owner.flush.called)
@@ -612,7 +612,7 @@ class BlockManagerTest(unittest.TestCase):
         blockmanager = arvados.arvfile._BlockManager(mockkeep)
         bufferblock = blockmanager.alloc_bufferblock()
         bufferblock.owner = mock.MagicMock()
-        bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+        bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
         bufferblock.append("foo")
         with self.assertRaises(arvados.errors.KeepWriteError) as err:
             blockmanager.commit_all()
index 5e1a055d483695c864df6adaf0572368b0f0e165..8bf08d0e1f666c4bde84d248c6dc0ad69bb1576c 100644 (file)
@@ -883,6 +883,24 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         c.copy("count1.txt", "foo/")
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
 
+    def test_rename_file(self):
+        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c.rename("count1.txt", "count2.txt")
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+    def test_move_file_to_dir(self):
+        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c.mkdirs("foo")
+        c.rename("count1.txt", "foo/count2.txt")
+        self.assertEqual("./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+    def test_move_file_to_other(self):
+        c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c2 = Collection()
+        c2.rename("count1.txt", "count2.txt", source_collection=c1)
+        self.assertEqual("", c1.manifest_text())
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c2.manifest_text())
+
     def test_clone(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
         cl = c.clone()
@@ -982,8 +1000,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         d = c1.diff(c2)
         self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
                              ('add', './count2.txt', c2["count2.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 changed, so it should not be deleted.
         c1.apply(d)
@@ -994,8 +1012,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
         d = c1.diff(c2)
         self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 changed, so c2 mod will go to a conflict file
         c1.apply(d)
@@ -1007,8 +1025,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         d = c1.diff(c2)
         self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]),
                              ('add', './count1.txt', c2["count1.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 added count1.txt, so c2 add will go to a conflict file
         c1.apply(d)
index 8d048487928c54f57dd944c083ee76ffb05c01d1..b85e65d74f7c3575c40a58c3901718fe9bd88419 100644 (file)
@@ -22,43 +22,54 @@ import threading
 import itertools
 import ciso8601
 import collections
+import functools
 
-from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
-from fusefile import StreamReaderFile, StringFile
+from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from fusefile import StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
+log_handler = logging.StreamHandler()
+llogger = logging.getLogger('llfuse')
+llogger.addHandler(log_handler)
+llogger.setLevel(logging.DEBUG)
 
-class FileHandle(object):
-    """Connects a numeric file handle to a File object that has
+class Handle(object):
+    """Connects a numeric file handle to a File or Directory object that has
     been opened by the client."""
 
-    def __init__(self, fh, fileobj):
+    def __init__(self, fh, obj):
         self.fh = fh
-        self.fileobj = fileobj
-        self.fileobj.inc_use()
+        self.obj = obj
+        self.obj.inc_use()
 
     def release(self):
-        self.fileobj.dec_use()
+        self.obj.dec_use()
 
+    def flush(self):
+        with llfuse.lock_released:
+            return self.obj.flush()
 
-class DirectoryHandle(object):
+
+class FileHandle(Handle):
+    """Connects a numeric file handle to a File  object that has
+    been opened by the client."""
+    pass
+
+
+class DirectoryHandle(Handle):
     """Connects a numeric file handle to a Directory object that has
     been opened by the client."""
 
     def __init__(self, fh, dirobj, entries):
-        self.fh = fh
+        super(DirectoryHandle, self).__init__(fh, dirobj)
         self.entries = entries
-        self.dirobj = dirobj
-        self.dirobj.inc_use()
-
-    def release(self):
-        self.dirobj.dec_use()
 
 
 class InodeCache(object):
     def __init__(self, cap, min_entries=4):
         self._entries = collections.OrderedDict()
+        self._by_uuid = {}
         self._counter = itertools.count(1)
         self.cap = cap
         self._total = 0
@@ -73,6 +84,9 @@ class InodeCache(object):
             return False
         self._total -= obj.cache_size
         del self._entries[obj.cache_priority]
+        if obj.cache_uuid:
+            del self._by_uuid[obj.cache_uuid]
+            obj.cache_uuid = None
         _logger.debug("Cleared %s total now %i", obj, self._total)
         return True
 
@@ -89,9 +103,14 @@ class InodeCache(object):
             obj.cache_priority = next(self._counter)
             obj.cache_size = obj.objsize()
             self._entries[obj.cache_priority] = obj
+            obj.cache_uuid = obj.uuid()
+            if obj.cache_uuid:
+                self._by_uuid[obj.cache_uuid] = obj
             self._total += obj.objsize()
             _logger.debug("Managing %s total now %i", obj, self._total)
             self.cap_cache()
+        else:
+            obj.cache_priority = None
 
     def touch(self, obj):
         if obj.persisted():
@@ -104,6 +123,9 @@ class InodeCache(object):
         if obj.persisted() and obj.cache_priority in self._entries:
             self._remove(obj, True)
 
+    def find(self, uuid):
+        return self._by_uuid.get(uuid)
+
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
@@ -139,9 +161,29 @@ class Inodes(object):
         return entry
 
     def del_entry(self, entry):
-        self.inode_cache.unmanage(entry)
-        llfuse.invalidate_inode(entry.inode)
-        del self._entries[entry.inode]
+        if entry.ref_count == 0:
+            _logger.warn("Deleting inode %i", entry.inode)
+            self.inode_cache.unmanage(entry)
+            llfuse.invalidate_inode(entry.inode)
+            del self._entries[entry.inode]
+        else:
+            _logger.warn("Inode %i has refcount %i", entry.inode, entry.ref_count)
+            entry.dead = True
+
+def catch_exceptions(orig_func):
+    @functools.wraps(orig_func)
+    def catch_exceptions_wrapper(self, *args, **kwargs):
+        try:
+            return orig_func(self, *args, **kwargs)
+        except llfuse.FUSEError:
+            raise
+        except EnvironmentError as e:
+            raise llfuse.FUSEError(e.errno)
+        except:
+            _logger.exception("Unhandled exception during FUSE operation")
+            raise llfuse.FUSEError(errno.EIO)
+
+    return catch_exceptions_wrapper
 
 
 class Operations(llfuse.Operations):
@@ -156,7 +198,7 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, encoding="utf-8", inode_cache=None):
+    def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
         super(Operations, self).__init__()
 
         if not inode_cache:
@@ -174,14 +216,41 @@ class Operations(llfuse.Operations):
         # is fully initialized should wait() on this event object.
         self.initlock = threading.Event()
 
+        self.num_retries = num_retries
+
+        self.events = None
+
     def init(self):
         # Allow threads that are waiting for the driver to be finished
         # initializing to continue
         self.initlock.set()
 
+    def destroy(self):
+        if self.events:
+            self.events.close()
+
     def access(self, inode, mode, ctx):
         return True
 
+    def listen_for_events(self, api_client):
+        self.event = arvados.events.subscribe(api_client,
+                                 [["event_type", "in", ["create", "update", "delete"]]],
+                                 self.on_event)
+
+    def on_event(self, ev):
+        if 'event_type' in ev:
+            with llfuse.lock:
+                item = self.inodes.inode_cache.find(ev["object_uuid"])
+                if item:
+                    item.invalidate()
+                    item.update()
+
+                itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
+                if itemparent:
+                    itemparent.invalidate()
+                    itemparent.update()
+
+    @catch_exceptions
     def getattr(self, inode):
         if inode not in self.inodes:
             raise llfuse.FUSEError(errno.ENOENT)
@@ -197,10 +266,13 @@ class Operations(llfuse.Operations):
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
-        elif isinstance(e, StreamReaderFile):
-            entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
         else:
             entry.st_mode |= stat.S_IFREG
+            if isinstance(e, FuseArvadosFile):
+                entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
+
+        if e.writable():
+            entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
 
         entry.st_nlink = 1
         entry.st_uid = self.uid
@@ -217,10 +289,9 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @catch_exceptions
     def lookup(self, parent_inode, name):
         name = unicode(name, self.encoding)
-        _logger.debug("arv-mount lookup: parent_inode %i name %s",
-                      parent_inode, name)
         inode = None
 
         if name == '.':
@@ -234,28 +305,43 @@ class Operations(llfuse.Operations):
                     inode = p[name].inode
 
         if inode != None:
+            _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
+                      parent_inode, name, inode)
+            self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
+            _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
+                      parent_inode, name)
             raise llfuse.FUSEError(errno.ENOENT)
 
+    @catch_exceptions
+    def forget(self, inodes):
+        for inode, nlookup in inodes:
+            _logger.debug("arv-mount forget: %i %i", inode, nlookup)
+            ent = self.inodes[inode]
+            if ent.dec_ref(nlookup) == 0 and ent.dead:
+                self.inodes.del_entry(ent)
+
+    @catch_exceptions
     def open(self, inode, flags):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
             raise llfuse.FUSEError(errno.ENOENT)
 
-        if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
-            raise llfuse.FUSEError(errno.EROFS)
-
         if isinstance(p, Directory):
             raise llfuse.FUSEError(errno.EISDIR)
 
+        if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
         fh = self._filehandles_counter
         self._filehandles_counter += 1
         self._filehandles[fh] = FileHandle(fh, p)
         self.inodes.touch(p)
         return fh
 
+    @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read %i %i %i", fh, off, size)
         if fh in self._filehandles:
@@ -263,20 +349,40 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        self.inodes.touch(handle.fileobj)
+        self.inodes.touch(handle.obj)
 
         try:
             with llfuse.lock_released:
-                return handle.fileobj.readfrom(off, size)
+                return handle.obj.readfrom(off, size, self.num_retries)
         except arvados.errors.NotFoundError as e:
             _logger.warning("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
-        except Exception:
-            _logger.exception()
-            raise llfuse.FUSEError(errno.EIO)
 
+    @catch_exceptions
+    def write(self, fh, off, buf):
+        _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
+        if fh in self._filehandles:
+            handle = self._filehandles[fh]
+        else:
+            raise llfuse.FUSEError(errno.EBADF)
+
+        if not handle.obj.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        self.inodes.touch(handle.obj)
+
+        with llfuse.lock_released:
+            return handle.obj.writeto(off, buf, self.num_retries)
+
+    @catch_exceptions
     def release(self, fh):
         if fh in self._filehandles:
+            try:
+                self._filehandles[fh].flush()
+            except EnvironmentError as e:
+                raise llfuse.FUSEError(e.errno)
+            except Exception:
+                _logger.exception("Flush error")
             self._filehandles[fh].release()
             del self._filehandles[fh]
         self.inodes.inode_cache.cap_cache()
@@ -284,6 +390,7 @@ class Operations(llfuse.Operations):
     def releasedir(self, fh):
         self.release(fh)
 
+    @catch_exceptions
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
 
@@ -308,7 +415,7 @@ class Operations(llfuse.Operations):
         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
         return fh
 
-
+    @catch_exceptions
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
 
@@ -317,7 +424,7 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
+        _logger.debug("arv-mount handle.dirobj %s", handle.obj)
 
         e = off
         while e < len(handle.entries):
@@ -328,6 +435,7 @@ class Operations(llfuse.Operations):
                     pass
             e += 1
 
+    @catch_exceptions
     def statfs(self):
         st = llfuse.StatvfsData()
         st.f_bsize = 64 * 1024
@@ -343,12 +451,80 @@ class Operations(llfuse.Operations):
         st.f_frsize = 0
         return st
 
-    # The llfuse documentation recommends only overloading functions that
-    # are actually implemented, as the default implementation will raise ENOSYS.
-    # However, there is a bug in the llfuse default implementation of create()
-    # "create() takes exactly 5 positional arguments (6 given)" which will crash
-    # arv-mount.
-    # The workaround is to implement it with the proper number of parameters,
-    # and then everything works out.
+    def _check_writable(self, inode_parent):
+        if inode_parent in self.inodes:
+            p = self.inodes[inode_parent]
+        else:
+            raise llfuse.FUSEError(errno.ENOENT)
+
+        if not isinstance(p, Directory):
+            raise llfuse.FUSEError(errno.ENOTDIR)
+
+        if not p.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if not isinstance(p, CollectionDirectoryBase):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        return p
+
+    @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx):
-        raise llfuse.FUSEError(errno.EROFS)
+        p = self._check_writable(inode_parent)
+
+        with llfuse.lock_released:
+            p.collection.open(name, "w")
+
+        # The file entry should have been implicitly created by callback.
+        f = p[name]
+        fh = self._filehandles_counter
+        self._filehandles_counter += 1
+        self._filehandles[fh] = FileHandle(fh, f)
+        self.inodes.touch(p)
+
+        f.inc_ref()
+        return (fh, self.getattr(f.inode))
+
+    @catch_exceptions
+    def mkdir(self, inode_parent, name, mode, ctx):
+        p = self._check_writable(inode_parent)
+
+        with llfuse.lock_released:
+            p.collection.mkdirs(name)
+
+        # The dir entry should have been implicitly created by callback.
+        d = p[name]
+
+        d.inc_ref()
+        return self.getattr(d.inode)
+
+    @catch_exceptions
+    def unlink(self, inode_parent, name):
+        p = self._check_writable(inode_parent)
+
+        with llfuse.lock_released:
+            p.collection.remove(name)
+
+    def rmdir(self, inode_parent, name):
+        self.unlink(inode_parent, name)
+
+    @catch_exceptions
+    def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
+        src = self._check_writable(inode_parent_old)
+        dest = self._check_writable(inode_parent_new)
+
+        with llfuse.lock_released:
+            dest.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+            dest.flush()
+            src.flush()
+
+    @catch_exceptions
+    def flush(self, fh):
+        if fh in self._filehandles:
+            self._filehandles[fh].flush()
+
+    def fsync(self, fh, datasync):
+        self.flush(fh)
+
+    def fsyncdir(self, fh, datasync):
+        self.flush(fh)
index 5acadfdf7a4fb9b41b6f8c32515a8675cc7e9adf..aeb8f737c51ba9e82ceea9b171344720666e64ce 100644 (file)
@@ -31,8 +31,11 @@ class FreshBase(object):
         self._atime = time.time()
         self._poll_time = 60
         self.use_count = 0
-        self.cache_priority = 0
+        self.ref_count = 0
+        self.dead = False
+        self.cache_priority = None
         self.cache_size = 0
+        self.cache_uuid = None
 
     # Mark the value as stale
     def invalidate(self):
@@ -68,5 +71,16 @@ class FreshBase(object):
     def dec_use(self):
         self.use_count -= 1
 
+    def inc_ref(self):
+        self.ref_count += 1
+        return self.ref_count
+
+    def dec_ref(self, n):
+        self.ref_count -= n
+        return self.ref_count
+
     def objsize(self):
         return 0
+
+    def uuid(self):
+        return None
index 2757091aa420ef791e20fd88908c5b2b6270f163..0e86c8db2ba1316d01040fbaff32064542f5ddd5 100644 (file)
@@ -6,9 +6,10 @@ import arvados
 import apiclient
 import functools
 
-from fusefile import StringFile, StreamReaderFile, ObjectFile
+from fusefile import StringFile, ObjectFile, FuseArvadosFile
 from fresh import FreshBase, convertTime, use_counter
 
+import arvados.collection
 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
 
 _logger = logging.getLogger('arvados.arvados_fuse')
@@ -137,6 +138,7 @@ class Directory(FreshBase):
             changed = True
 
         if changed:
+            llfuse.invalidate_inode(self.inode)
             self._mtime = time.time()
 
         self.fresh()
@@ -163,27 +165,78 @@ class Directory(FreshBase):
     def mtime(self):
         return self._mtime
 
+    def writable(self):
+        return False
+
+    def flush(self):
+        pass
+
+class CollectionDirectoryBase(Directory):
+    def __init__(self, parent_inode, inodes, collection):
+        super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
+        self.collection = collection
 
-class CollectionDirectory(Directory):
+    def new_entry(self, name, item, mtime):
+        name = sanitize_filename(name)
+        if isinstance(item, arvados.collection.RichCollectionBase):
+            self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
+            self._entries[name].populate(mtime)
+        else:
+            self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
+
+    def on_event(self, event, collection, name, item):
+        _logger.warn("Got event! %s %s %s %s", event, collection, name, item)
+        if collection == self.collection:
+            with llfuse.lock:
+                if event == arvados.collection.ADD:
+                    self.new_entry(name, item, self.mtime())
+                elif event == arvados.collection.DEL:
+                    ent = self._entries[name]
+                    del self._entries[name]
+                    llfuse.invalidate_entry(self.inode, name)
+                    self.inodes.del_entry(ent)
+                elif event == arvados.collection.MOD:
+                    ent = self._entries[name]
+                    llfuse.invalidate_inode(ent.inode)
+        _logger.warn("Finished handling event")
+
+    def populate(self, mtime):
+        self._mtime = mtime
+        self.collection.subscribe(self.on_event)
+        for entry, item in self.collection.items():
+            self.new_entry(entry, item, self.mtime())
+
+    def writable(self):
+        return self.collection.writable()
+
+    def flush(self):
+        self.collection.root_collection().save()
+
+class CollectionDirectory(CollectionDirectoryBase):
     """Represents the root of a directory tree holding a collection."""
 
-    def __init__(self, parent_inode, inodes, api, num_retries, collection):
-        super(CollectionDirectory, self).__init__(parent_inode, inodes)
+    def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
+        super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
         self.api = api
         self.num_retries = num_retries
-        self.collection_object_file = None
-        self.collection_object = None
-        if isinstance(collection, dict):
-            self.collection_locator = collection['uuid']
-            self._mtime = convertTime(collection.get('modified_at'))
+        self.collection_record_file = None
+        self.collection_record = None
+        if isinstance(collection_record, dict):
+            self.collection_locator = collection_record['uuid']
+            self._mtime = convertTime(collection_record.get('modified_at'))
         else:
-            self.collection_locator = collection
+            self.collection_locator = collection_record
             self._mtime = 0
         self._manifest_size = 0
+        if self.collection_locator:
+            self._writable = (uuid_pattern.match(self.collection_locator) is not None)
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
 
+    def writable(self):
+        return self.collection.writable() if self.collection is not None else self._writable
+
     # Used by arv-web.py to switch the contents of the CollectionDirectory
     def change_collection(self, new_locator):
         """Switch the contents of the CollectionDirectory.
@@ -192,33 +245,30 @@ class CollectionDirectory(Directory):
         """
 
         self.collection_locator = new_locator
-        self.collection_object = None
+        self.collection_record = None
         self.update()
 
-    def new_collection(self, new_collection_object, coll_reader):
-        self.clear(force=True)
+    def new_collection(self, new_collection_record, coll_reader):
+        if self.inode:
+            self.clear(force=True)
 
-        self.collection_object = new_collection_object
+        self.collection_record = new_collection_record
 
-        self._mtime = convertTime(self.collection_object.get('modified_at'))
+        if self.collection_record:
+            self._mtime = convertTime(self.collection_record.get('modified_at'))
+            self.collection_locator = self.collection_record["uuid"]
+            if self.collection_record_file is not None:
+                self.collection_record_file.update(self.collection_record)
 
-        if self.collection_object_file is not None:
-            self.collection_object_file.update(self.collection_object)
+        self.collection = coll_reader
+        self.populate(self.mtime())
 
-        for s in coll_reader.all_streams():
-            cwd = self
-            for part in s.name().split('/'):
-                if part != '' and part != '.':
-                    partname = sanitize_filename(part)
-                    if partname not in cwd._entries:
-                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
-                    cwd = cwd._entries[partname]
-            for k, v in s.files().items():
-                cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
+    def uuid(self):
+        return self.collection_locator
 
     def update(self):
         try:
-            if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
+            if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
 
             if self.collection_locator is None:
@@ -226,48 +276,56 @@ class CollectionDirectory(Directory):
                 return True
 
             with llfuse.lock_released:
-                coll_reader = arvados.CollectionReader(
-                    self.collection_locator, self.api, self.api.keep,
-                    num_retries=self.num_retries)
-                new_collection_object = coll_reader.api_response() or {}
-                # If the Collection only exists in Keep, there will be no API
-                # response.  Fill in the fields we need.
-                if 'uuid' not in new_collection_object:
-                    new_collection_object['uuid'] = self.collection_locator
-                if "portable_data_hash" not in new_collection_object:
-                    new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
-                if 'manifest_text' not in new_collection_object:
-                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
-                coll_reader.normalize()
+                _logger.debug("Updating %s", self.collection_locator)
+                if self.collection:
+                    self.collection.update()
+                else:
+                    if uuid_pattern.match(self.collection_locator):
+                        coll_reader = arvados.collection.Collection(
+                            self.collection_locator, self.api, self.api.keep,
+                            num_retries=self.num_retries)
+                    else:
+                        coll_reader = arvados.collection.CollectionReader(
+                            self.collection_locator, self.api, self.api.keep,
+                            num_retries=self.num_retries)
+                    new_collection_record = coll_reader.api_response() or {}
+                    # If the Collection only exists in Keep, there will be no API
+                    # response.  Fill in the fields we need.
+                    if 'uuid' not in new_collection_record:
+                        new_collection_record['uuid'] = self.collection_locator
+                    if "portable_data_hash" not in new_collection_record:
+                        new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+                    if 'manifest_text' not in new_collection_record:
+                        new_collection_record['manifest_text'] = coll_reader.manifest_text()
+
+                    if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
+                        self.new_collection(new_collection_record, coll_reader)
+
+                    self._manifest_size = len(coll_reader.manifest_text())
+                    _logger.debug("%s manifest_size %i", self, self._manifest_size)
             # end with llfuse.lock_released, re-acquire lock
 
-            if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
-                self.new_collection(new_collection_object, coll_reader)
-
-            self._manifest_size = len(coll_reader.manifest_text())
-            _logger.debug("%s manifest_size %i", self, self._manifest_size)
-
             self.fresh()
             return True
         except arvados.errors.NotFoundError:
             _logger.exception("arv-mount %s: error", self.collection_locator)
         except arvados.errors.ArgumentError as detail:
             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         except Exception:
             _logger.exception("arv-mount %s: error", self.collection_locator)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         return False
 
     def __getitem__(self, item):
         self.checkupdate()
         if item == '.arvados#collection':
-            if self.collection_object_file is None:
-                self.collection_object_file = ObjectFile(self.inode, self.collection_object)
-                self.inodes.add_entry(self.collection_object_file)
-            return self.collection_object_file
+            if self.collection_record_file is None:
+                self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+                self.inodes.add_entry(self.collection_record_file)
+            return self.collection_record_file
         else:
             return super(CollectionDirectory, self).__getitem__(item)
 
@@ -278,8 +336,8 @@ class CollectionDirectory(Directory):
             return super(CollectionDirectory, self).__contains__(k)
 
     def invalidate(self):
-        self.collection_object = None
-        self.collection_object_file = None
+        self.collection_record = None
+        self.collection_record_file = None
         super(CollectionDirectory, self).invalidate()
 
     def persisted(self):
@@ -450,6 +508,9 @@ class ProjectDirectory(Directory):
         else:
             return None
 
+    def uuid(self):
+        return self.uuid
+
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
index efe31c387c09432f3905144c19b5417161d6a1a7..d3c13f3a16a227a6dd063bfe597161757eb936ab 100644 (file)
@@ -18,32 +18,50 @@ class File(FreshBase):
     def size(self):
         return 0
 
-    def readfrom(self, off, size):
+    def readfrom(self, off, size, num_retries=0):
         return ''
 
+    def writeto(self, off, size, num_retries=0):
+        raise Exception("Not writable")
+
     def mtime(self):
         return self._mtime
 
     def clear(self, force=False):
         return True
 
+    def writable(self):
+        return False
+
+    def flush(self):
+        pass
 
-class StreamReaderFile(File):
-    """Wraps a StreamFileReader as a file."""
+class FuseArvadosFile(File):
+    """Wraps a ArvadosFile."""
 
-    def __init__(self, parent_inode, reader, _mtime):
-        super(StreamReaderFile, self).__init__(parent_inode, _mtime)
-        self.reader = reader
+    def __init__(self, parent_inode, arvfile, _mtime):
+        super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
+        self.arvfile = arvfile
 
     def size(self):
-        return self.reader.size()
+        return self.arvfile.size()
 
-    def readfrom(self, off, size):
-        return self.reader.readfrom(off, size)
+    def readfrom(self, off, size, num_retries=0):
+        return self.arvfile.readfrom(off, size, num_retries, exact=True)
+
+    def writeto(self, off, buf, num_retries=0):
+        return self.arvfile.writeto(off, buf, num_retries)
 
     def stale(self):
         return False
 
+    def writable(self):
+        return self.arvfile.writable()
+
+    def flush(self):
+        if self.writable():
+            self.arvfile.parent.root_collection().save()
+
 
 class StringFile(File):
     """Wrap a simple string as a file"""
@@ -54,7 +72,7 @@ class StringFile(File):
     def size(self):
         return len(self.contents)
 
-    def readfrom(self, off, size):
+    def readfrom(self, off, size, num_retries=0):
         return self.contents[off:(off+size)]
 
 
index 0ef3ea67df379806a56820ba273b482d61d0281b..34b5fc260d2d7e88747a6534ad5fd52ffd8b6950 100644 (file)
@@ -29,7 +29,7 @@ setup(name='arvados_fuse',
         'bin/arv-mount'
         ],
       install_requires=[
-        'arvados-python-client>=0.1.20150303143450',
+        'arvados-python-client',
         'llfuse',
         'python-daemon',
         'ciso8601'
index 61a365f8b06c9c7d5294d4662aa09aa118458191..cce5b95a80ed5c5ab0dfc6819ff96bb661f6387d 100644 (file)
@@ -140,6 +140,7 @@ class InodeTests(unittest.TestCase):
         # Delete ent1
         self.assertEqual(500, cache.total())
         ent1.clear.return_value = True
+        ent1.ref_count = 0
         inodes.del_entry(ent1)
         self.assertEqual(0, cache.total())
         cache.touch(ent3)
index 5535494a67b77089d233544e2c2c1e219eedc622..c7d63873cdd7ae3ddb851d8afc7f053d13e0086a 100644 (file)
@@ -12,9 +12,12 @@ import tempfile
 import threading
 import time
 import unittest
-
+import logging
+import multiprocessing
 import run_test_server
 
+logger = logging.getLogger('arvados.arv-mount')
+
 class MountTestBase(unittest.TestCase):
     def setUp(self):
         self.keeptmp = tempfile.mkdtemp()
@@ -25,13 +28,14 @@ class MountTestBase(unittest.TestCase):
         self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def make_mount(self, root_class, **root_kwargs):
-        operations = fuse.Operations(os.getuid(), os.getgid())
-        operations.inodes.add_entry(root_class(
-            llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
-        llfuse.init(operations, self.mounttmp, [])
+        self.operations = fuse.Operations(os.getuid(), os.getgid())
+        self.operations.inodes.add_entry(root_class(
+            llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
+        llfuse.init(self.operations, self.mounttmp, [])
         threading.Thread(None, llfuse.main).start()
         # wait until the driver is finished initializing
-        operations.initlock.wait()
+        self.operations.initlock.wait()
+        return self.operations.inodes[llfuse.ROOT_INODE]
 
     def tearDown(self):
         # llfuse.close is buggy, so use fusermount instead.
@@ -51,7 +55,7 @@ class MountTestBase(unittest.TestCase):
         path = self.mounttmp
         if subdir:
             path = os.path.join(path, subdir)
-        self.assertEqual(sorted(expect_content), sorted(os.listdir(path)))
+        self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(path)))
 
 
 class FuseMountTest(MountTestBase):
@@ -98,7 +102,7 @@ class FuseMountTest(MountTestBase):
         self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
 
     def runTest(self):
-        self.make_mount(fuse.CollectionDirectory, collection=self.testcollection)
+        self.make_mount(fuse.CollectionDirectory, collection_record=self.testcollection)
 
         self.assertDirContents(None, ['thing1.txt', 'thing2.txt',
                                       'edgecases', 'dir1', 'dir2'])
@@ -157,7 +161,7 @@ class FuseMagicTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.MagicDirectory)
 
-        mount_ls = os.listdir(self.mounttmp)
+        mount_ls = llfuse.listdir(self.mounttmp)
         self.assertIn('README', mount_ls)
         self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
                              arvados.util.uuid_pattern.match(fn)
@@ -166,11 +170,11 @@ class FuseMagicTest(MountTestBase):
         self.assertDirContents(self.testcollection, ['thing1.txt'])
         self.assertDirContents(os.path.join('by_id', self.testcollection),
                                ['thing1.txt'])
-        mount_ls = os.listdir(self.mounttmp)
+        mount_ls = llfuse.listdir(self.mounttmp)
         self.assertIn('README', mount_ls)
         self.assertIn(self.testcollection, mount_ls)
         self.assertIn(self.testcollection,
-                      os.listdir(os.path.join(self.mounttmp, 'by_id')))
+                      llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
 
         files = {}
         files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
@@ -184,15 +188,15 @@ class FuseTagsTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.TagsDirectory)
 
-        d1 = os.listdir(self.mounttmp)
+        d1 = llfuse.listdir(self.mounttmp)
         d1.sort()
         self.assertEqual(['foo_tag'], d1)
 
-        d2 = os.listdir(os.path.join(self.mounttmp, 'foo_tag'))
+        d2 = llfuse.listdir(os.path.join(self.mounttmp, 'foo_tag'))
         d2.sort()
         self.assertEqual(['zzzzz-4zz18-fy296fx3hot09f7'], d2)
 
-        d3 = os.listdir(os.path.join(self.mounttmp, 'foo_tag', 'zzzzz-4zz18-fy296fx3hot09f7'))
+        d3 = llfuse.listdir(os.path.join(self.mounttmp, 'foo_tag', 'zzzzz-4zz18-fy296fx3hot09f7'))
         d3.sort()
         self.assertEqual(['foo'], d3)
 
@@ -208,12 +212,12 @@ class FuseTagsUpdateTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.TagsDirectory, poll_time=1)
 
-        self.assertIn('foo_tag', os.listdir(self.mounttmp))
+        self.assertIn('foo_tag', llfuse.listdir(self.mounttmp))
 
         bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
         self.tag_collection(bar_uuid, 'fuse_test_tag')
         time.sleep(1)
-        self.assertIn('fuse_test_tag', os.listdir(self.mounttmp))
+        self.assertIn('fuse_test_tag', llfuse.listdir(self.mounttmp))
         self.assertDirContents('fuse_test_tag', [bar_uuid])
 
         baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
@@ -234,14 +238,14 @@ class FuseSharedTest(MountTestBase):
         # shared_dirs is a list of the directories exposed
         # by fuse.SharedDirectory (i.e. any object visible
         # to the current user)
-        shared_dirs = os.listdir(self.mounttmp)
+        shared_dirs = llfuse.listdir(self.mounttmp)
         shared_dirs.sort()
         self.assertIn('FUSE User', shared_dirs)
 
         # fuse_user_objs is a list of the objects owned by the FUSE
         # test user (which present as files in the 'FUSE User'
         # directory)
-        fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
+        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
         fuse_user_objs.sort()
         self.assertEqual(['FUSE Test Project',                    # project owned by user
                           'collection #1 owned by FUSE',          # collection owned by user
@@ -250,7 +254,7 @@ class FuseSharedTest(MountTestBase):
                       ], fuse_user_objs)
 
         # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = os.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
+        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
         test_proj_files.sort()
         self.assertEqual(['collection in FUSE project',
                           'pipeline instance in FUSE project.pipelineInstance',
@@ -285,10 +289,10 @@ class FuseHomeTest(MountTestBase):
         self.make_mount(fuse.ProjectDirectory,
                         project_object=self.api.users().current().execute())
 
-        d1 = os.listdir(self.mounttmp)
+        d1 = llfuse.listdir(self.mounttmp)
         self.assertIn('Unrestricted public data', d1)
 
-        d2 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
+        d2 = llfuse.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
         public_project = run_test_server.fixture('groups')[
             'anonymously_accessible_project']
         found_in = 0
@@ -308,9 +312,350 @@ class FuseHomeTest(MountTestBase):
         self.assertNotEqual(0, found_in)
         self.assertNotEqual(0, found_not_in)
 
-        d3 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
+        d3 = llfuse.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
         self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)
 
+class FuseUpdateFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        with collection.open("file1.txt", "w") as f:
+            f.write("blub")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+        with open(os.path.join(self.mounttmp, "file1.txt")) as f:
+            self.assertEqual("blub", f.read())
+
+        with collection.open("file1.txt", "w") as f:
+            f.write("plnp")
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+        with open(os.path.join(self.mounttmp, "file1.txt")) as f:
+            self.assertEqual("plnp", f.read())
+
+class FuseAddFileToCollectionTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        with collection.open("file1.txt", "w") as f:
+            f.write("blub")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+
+        with collection.open("file2.txt", "w") as f:
+            f.write("plnp")
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
+
+class FuseRemoveFileFromCollectionTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        with collection.open("file1.txt", "w") as f:
+            f.write("blub")
+
+        with collection.open("file2.txt", "w") as f:
+            f.write("plnp")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
+
+        collection.remove("file2.txt")
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+
+class FuseCreateFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertEqual(collection2["manifest_text"], "")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.assertNotIn("file1.txt", collection)
+
+        with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
+            pass
+
+        self.assertIn("file1.txt", collection)
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. d41d8cd98f00b204e9800998ecf8427e\+0\+A[a-f0-9]{40}@[a-f0-9]{8} 0:0:file1\.txt$')
+
+def fuseWriteFileTestHelper(mounttmp):
+    with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
+        return f.read() == "Hello world!"
+
+class FuseWriteFileTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.assertNotIn("file1.txt", collection)
+
+        with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
+            f.write("Hello world!")
+
+        with collection.open("file1.txt") as f:
+            self.assertEqual(f.read(), "Hello world!")
+
+        # We can't just open the collection for reading because the underlying
+        # C implementation of open() makes a fstat() syscall with the GIL still
+        # held.  When the GETATTR message comes back to llfuse (which in these
+        # tests is in the same interpreter process) it can't acquire the GIL,
+        # so it can't service the fstat() call, so it deadlocks.  The
+        # workaround is to run some of our test code in a separate process.
+        # Forturnately the multiprocessing module makes this relatively easy.
+        pool = multiprocessing.Pool(1)
+        self.assertTrue(pool.apply(fuseWriteFileTestHelper, (self.mounttmp,)))
+        pool.close()
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+def fuseUpdateFileTestHelper1(mounttmp):
+    with open(os.path.join(mounttmp, "file1.txt"), "r+") as f:
+        fr = f.read()
+        if fr != "Hello world!":
+            raise Exception("Got %s expected 'Hello world!'" % fr)
+        f.seek(0)
+        f.write("Hola mundo!")
+        f.seek(0)
+        fr = f.read()
+        if fr != "Hola mundo!!":
+            raise Exception("Got %s expected 'Hola mundo!!'" % fr)
+        return True
+
+def fuseUpdateFileTestHelper2(mounttmp):
+    with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
+        return f.read() == "Hola mundo!!"
+
+class FuseUpdateFileTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        with open(os.path.join(self.mounttmp, "file1.txt"), "w") as f:
+            f.write("Hello world!")
+
+        # See note in FuseWriteFileTest
+        pool = multiprocessing.Pool(1)
+        self.assertTrue(pool.apply(fuseUpdateFileTestHelper1, (self.mounttmp,)))
+        self.assertTrue(pool.apply(fuseUpdateFileTestHelper2, (self.mounttmp,)))
+        pool.close()
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. daaef200ebb921e011e3ae922dd3266b\+11\+A[a-f0-9]{40}@[a-f0-9]{8} 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:11:file1\.txt 22:1:file1\.txt$')
+
+
+class FuseMkdirTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        with self.assertRaises(IOError):
+            with open(os.path.join(self.mounttmp, "testdir", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+        os.mkdir(os.path.join(self.mounttmp, "testdir"))
+
+        with self.assertRaises(OSError):
+            os.mkdir(os.path.join(self.mounttmp, "testdir"))
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["testdir"], d1)
+
+        with open(os.path.join(self.mounttmp, "testdir", "file1.txt"), "w") as f:
+            f.write("Hello world!")
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp, "testdir"))
+        self.assertEqual(["file1.txt"], d1)
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+
+class FuseRmTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        os.mkdir(os.path.join(self.mounttmp, "testdir"))
+
+        with open(os.path.join(self.mounttmp, "testdir", "file1.txt"), "w") as f:
+            f.write("Hello world!")
+
+        # Starting manifest
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+        # Can't delete because it's not empty
+        with self.assertRaises(OSError):
+            os.rmdir(os.path.join(self.mounttmp, "testdir"))
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp, "testdir"))
+        self.assertEqual(["file1.txt"], d1)
+
+        # Delete file
+        os.remove(os.path.join(self.mounttmp, "testdir", "file1.txt"))
+
+        # Make sure it's empty
+        d1 = llfuse.listdir(os.path.join(self.mounttmp, "testdir"))
+        self.assertEqual([], d1)
+
+        # Try to delete it again
+        with self.assertRaises(OSError):
+            os.remove(os.path.join(self.mounttmp, "testdir", "file1.txt"))
+
+        # Can't have empty directories :-( so manifest will be empty.
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertEqual(collection2["manifest_text"], "")
+
+        # Should be able to delete now that it is empty
+        os.rmdir(os.path.join(self.mounttmp, "testdir"))
+
+        # Make sure it's empty
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual([], d1)
+
+        # Try to delete it again
+        with self.assertRaises(OSError):
+            os.rmdir(os.path.join(self.mounttmp, "testdir"))
+
+        # manifest should be empty now.
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertEqual(collection2["manifest_text"], "")
+
+
+class FuseMvTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        os.mkdir(os.path.join(self.mounttmp, "testdir"))
+
+        with open(os.path.join(self.mounttmp, "testdir", "file1.txt"), "w") as f:
+            f.write("Hello world!")
+
+        # Starting manifest
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual(["testdir"], d1)
+        d1 = llfuse.listdir(os.path.join(self.mounttmp, "testdir"))
+        self.assertEqual(["file1.txt"], d1)
+
+        os.rename(os.path.join(self.mounttmp, "testdir", "file1.txt"), os.path.join(self.mounttmp, "file1.txt"))
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual(["file1.txt", "testdir"], sorted(d1))
+        d1 = llfuse.listdir(os.path.join(self.mounttmp, "testdir"))
+        self.assertEqual([], d1)
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+
+class FuseUpdateFromEventTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        self.operations.listen_for_events(self.api)
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual([], sorted(d1))
+
+        with arvados.collection.Collection(collection.manifest_locator(), api_client=self.api) as collection2:
+            with collection2.open("file1.txt", "w") as f:
+                f.write("foo")
+
+        time.sleep(1)
+
+        # should show up via event bus notify
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual(["file1.txt"], sorted(d1))
+
 
 class FuseUnitTest(unittest.TestCase):
     def test_sanitize_filename(self):