Merge branch 'master' into 3198-writable-fuse
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 18 May 2015 20:50:36 +0000 (16:50 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 18 May 2015 20:50:36 +0000 (16:50 -0400)
14 files changed:
crunch_scripts/crunchutil/vwd.py
sdk/python/arvados/_ranges.py
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py
services/fuse/bin/arv-mount
services/fuse/setup.py
services/fuse/tests/test_inodes.py
services/fuse/tests/test_mount.py

index ca4116d7cee9015cbeb03bfaaf3ea4ca7dadfe03..0ae1c4620995014f61d17379bca756d2415f6e4c 100644 (file)
@@ -73,6 +73,12 @@ def checkin(target_dir):
                     if pdh is not None:
                         # 2. load collection
                         if pdh not in collections:
+                            # 2.1 make sure it is flushed (see #5787 note 11)
+                            fd = os.open(real[0], os.O_RDONLY)
+                            os.fsync(fd)
+                            os.close(fd)
+
+                            # 2.2 get collection from API server
                             collections[pdh] = arvados.collection.CollectionReader(pdh,
                                                                                    api_client=outputcollection._my_api(),
                                                                                    keep_client=outputcollection._my_keep(),
index d4f20f00087507a89c3d363b29a400176e45d542..d5ff6ed1b581f1f1b2047c8fa9198e5371a5e917 100644 (file)
@@ -2,6 +2,9 @@ import logging
 
 _logger = logging.getLogger('arvados.ranges')
 
+# Log level below 'debug' !
+RANGES_SPAM = 9
+
 class Range(object):
     def __init__(self, locator, range_start, range_size, segment_offset=0):
         self.locator = locator
@@ -96,7 +99,7 @@ def locators_and_ranges(data_locators, range_start, range_size):
         block_start = dl.range_start
         block_size = dl.range_size
         block_end = block_start + block_size
-        _logger.debug(
+        _logger.log(RANGES_SPAM,
             "%s range_start %s block_start %s range_end %s block_end %s",
             dl.locator, range_start, block_start, range_end, block_end)
         if range_end <= block_start:
@@ -170,7 +173,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
         dl = data_locators[i]
         old_segment_start = dl.range_start
         old_segment_end = old_segment_start + dl.range_size
-        _logger.debug(
+        _logger.log(RANGES_SPAM,
             "%s range_start %s segment_start %s range_end %s segment_end %s",
             dl, new_range_start, old_segment_start, new_range_end,
             old_segment_end)
index 2e0eaa8ce0f645ca305826851c5eb7ddccb434e2..2d44d6ad6ee8527b88815b21891dd33a278584f5 100644 (file)
@@ -9,6 +9,7 @@ import Queue
 import copy
 import errno
 import re
+import logging
 
 from .errors import KeepWriteError, AssertionError
 from .keep import KeepLocator
@@ -16,6 +17,11 @@ from ._normalize_stream import normalize_stream
 from ._ranges import locators_and_ranges, replace_range, Range
 from .retry import retry_method
 
+MOD = "mod"
+WRITE = "write"
+
+_logger = logging.getLogger('arvados.arvfile')
+
 def split(path):
     """split(path) -> streamname, filename
 
@@ -330,6 +336,12 @@ class _BufferBlock(object):
         bufferblock.append(self.buffer_view[0:self.size()])
         return bufferblock
 
+    @synchronized
+    def clear(self):
+        self.owner = None
+        self.buffer_block = None
+        self.buffer_view = None
+
 
 class NoopLock(object):
     def __enter__(self):
@@ -349,7 +361,7 @@ def must_be_writable(orig_func):
     @functools.wraps(orig_func)
     def must_be_writable_wrapper(self, *args, **kwargs):
         if not self.writable():
-            raise IOError((errno.EROFS, "Collection must be writable."))
+            raise IOError(errno.EROFS, "Collection must be writable.")
         return orig_func(self, *args, **kwargs)
     return must_be_writable_wrapper
 
@@ -436,11 +448,17 @@ class _BlockManager(object):
         self._prefetch_threads = None
         self._prefetch_queue = None
 
-    def commit_bufferblock(self, block):
+    def commit_bufferblock(self, block, wait):
         """Initiate a background upload of a bufferblock.
 
-        This will block if the upload queue is at capacity, otherwise it will
-        return immediately.
+        :block:
+          The block object to upload
+
+        :wait:
+          If `wait` is True, upload the block synchronously.
+          If `wait` is False, upload the block asynchronously.  This will
+          return immediately unless if the upload queue is at capacity, in
+          which case it will wait on an upload queue slot.
 
         """
 
@@ -452,6 +470,7 @@ class _BlockManager(object):
                     bufferblock = self._put_queue.get()
                     if bufferblock is None:
                         return
+
                     loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
                     bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
@@ -461,30 +480,37 @@ class _BlockManager(object):
                     if self._put_queue is not None:
                         self._put_queue.task_done()
 
-        with self.lock:
-            if self._put_threads is None:
-                # Start uploader threads.
-
-                # If we don't limit the Queue size, the upload queue can quickly
-                # grow to take up gigabytes of RAM if the writing process is
-                # generating data more quickly than it can be send to the Keep
-                # servers.
-                #
-                # With two upload threads and a queue size of 2, this means up to 4
-                # blocks pending.  If they are full 64 MiB blocks, that means up to
-                # 256 MiB of internal buffering, which is the same size as the
-                # default download block cache in KeepClient.
-                self._put_queue = Queue.Queue(maxsize=2)
-                self._put_errors = Queue.Queue()
-
-                self._put_threads = []
-                for i in xrange(0, self.num_put_threads):
-                    thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
-                    self._put_threads.append(thread)
-                    thread.daemon = True
-                    thread.start()
+        if block.state() != _BufferBlock.WRITABLE:
+            return
+
+        if wait:
+            block.set_state(_BufferBlock.PENDING)
+            loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+            block.set_state(_BufferBlock.COMMITTED, loc)
+        else:
+            with self.lock:
+                if self._put_threads is None:
+                    # Start uploader threads.
+
+                    # If we don't limit the Queue size, the upload queue can quickly
+                    # grow to take up gigabytes of RAM if the writing process is
+                    # generating data more quickly than it can be send to the Keep
+                    # servers.
+                    #
+                    # With two upload threads and a queue size of 2, this means up to 4
+                    # blocks pending.  If they are full 64 MiB blocks, that means up to
+                    # 256 MiB of internal buffering, which is the same size as the
+                    # default download block cache in KeepClient.
+                    self._put_queue = Queue.Queue(maxsize=2)
+                    self._put_errors = Queue.Queue()
+
+                    self._put_threads = []
+                    for i in xrange(0, self.num_put_threads):
+                        thread = threading.Thread(target=commit_bufferblock_worker, args=(self,))
+                        self._put_threads.append(thread)
+                        thread.daemon = True
+                        thread.start()
 
-        if block.state() == _BufferBlock.WRITABLE:
             # Mark the block as PENDING so to disallow any more appends.
             block.set_state(_BufferBlock.PENDING)
             self._put_queue.put(block)
@@ -493,6 +519,12 @@ class _BlockManager(object):
     def get_bufferblock(self, locator):
         return self._bufferblocks.get(locator)
 
+    @synchronized
+    def delete_bufferblock(self, locator):
+        bb = self._bufferblocks[locator]
+        bb.clear()
+        del self._bufferblocks[locator]
+
     def get_block_contents(self, locator, num_retries, cache_only=False):
         """Fetch a block.
 
@@ -524,7 +556,8 @@ class _BlockManager(object):
             items = self._bufferblocks.items()
 
         for k,v in items:
-            v.owner.flush()
+            if v.state() == _BufferBlock.WRITABLE:
+                v.owner.flush(False)
 
         with self.lock:
             if self._put_queue is not None:
@@ -539,6 +572,13 @@ class _BlockManager(object):
                         pass
                     raise KeepWriteError("Error writing some blocks", err, label="block")
 
+        for k,v in items:
+            # flush again with wait=True to remove committed bufferblocks from
+            # the segments.
+            if v.owner:
+                v.owner.flush(True)
+
+
     def block_prefetch(self, locator):
         """Initiate a background download of a block.
 
@@ -588,7 +628,7 @@ class ArvadosFile(object):
 
     """
 
-    def __init__(self, parent, stream=[], segments=[]):
+    def __init__(self, parent, name, stream=[], segments=[]):
         """
         ArvadosFile constructor.
 
@@ -599,6 +639,7 @@ class ArvadosFile(object):
           a list of Range objects representing segments
         """
         self.parent = parent
+        self.name = name
         self._modified = True
         self._segments = []
         self.lock = parent.root_collection().lock
@@ -614,9 +655,9 @@ class ArvadosFile(object):
         return copy.copy(self._segments)
 
     @synchronized
-    def clone(self, new_parent):
+    def clone(self, new_parent, new_name):
         """Make a copy of this file."""
-        cp = ArvadosFile(new_parent)
+        cp = ArvadosFile(new_parent, new_name)
         cp.replace_contents(self)
         return cp
 
@@ -713,8 +754,7 @@ class ArvadosFile(object):
             self._segments = new_segs
             self._modified = True
         elif size > self.size():
-            raise IOError("truncate() does not support extending the file size")
-
+            raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
 
     def readfrom(self, offset, size, num_retries, exact=False):
         """Read up to `size` bytes from the file starting at `offset`.
@@ -795,18 +835,38 @@ class ArvadosFile(object):
         if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
             self._repack_writes(num_retries)
             if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
-                self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, False)
                 self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
 
         self._current_bblock.append(data)
 
         replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
 
+        self.parent.notify(WRITE, self.parent, self.name, (self, self))
+
+        return len(data)
+
     @synchronized
-    def flush(self, num_retries=0):
-        if self._current_bblock:
-            self._repack_writes(num_retries)
-            self.parent._my_block_manager().commit_bufferblock(self._current_bblock)
+    def flush(self, wait=True, num_retries=0):
+        """Flush bufferblocks to Keep."""
+        if self.modified():
+            if self._current_bblock and self._current_bblock.state() == _BufferBlock.WRITABLE:
+                self._repack_writes(num_retries)
+                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, wait)
+            if wait:
+                to_delete = set()
+                for s in self._segments:
+                    bb = self.parent._my_block_manager().get_bufferblock(s.locator)
+                    if bb:
+                        if bb.state() != _BufferBlock.COMMITTED:
+                            _logger.error("bufferblock %s is not committed" % (s.locator))
+                        else:
+                            to_delete.add(s.locator)
+                            s.locator = bb.locator()
+                for s in to_delete:
+                   self.parent._my_block_manager().delete_bufferblock(s)
+
+            self.parent.notify(MOD, self.parent, self.name, (self, self))
 
     @must_be_writable
     @synchronized
@@ -852,6 +912,16 @@ class ArvadosFile(object):
         buf += "\n"
         return buf
 
+    @must_be_writable
+    @synchronized
+    def _reparent(self, newparent, newname):
+        self._modified = True
+        self.flush()
+        self.parent.remove(self.name)
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+
 
 class ArvadosFileReader(ArvadosFileReaderBase):
     """Wraps ArvadosFile in a file-like object supporting reading only.
@@ -861,8 +931,8 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
     """
 
-    def __init__(self, arvadosfile, name, mode="r", num_retries=None):
-        super(ArvadosFileReader, self).__init__(name, mode, num_retries=num_retries)
+    def __init__(self, arvadosfile,  mode="r", num_retries=None):
+        super(ArvadosFileReader, self).__init__(arvadosfile.name, mode, num_retries=num_retries)
         self.arvadosfile = arvadosfile
 
     def size(self):
@@ -873,16 +943,32 @@ class ArvadosFileReader(ArvadosFileReaderBase):
 
     @_FileLikeObjectBase._before_close
     @retry_method
-    def read(self, size, num_retries=None):
-        """Read up to `size` bytes from the stream, starting at the current file position."""
-        data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
-        self._filepos += len(data)
-        return data
+    def read(self, size=None, num_retries=None):
+        """Read up to `size` bytes from the file and return the result.
+
+        Starts at the current file position.  If `size` is None, read the
+        entire remainder of the file.
+        """
+        if size is None:
+            data = []
+            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+            while rd:
+                data.append(rd)
+                self._filepos += len(rd)
+                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
+            return ''.join(data)
+        else:
+            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
+            self._filepos += len(data)
+            return data
 
     @_FileLikeObjectBase._before_close
     @retry_method
     def readfrom(self, offset, size, num_retries=None):
-        """Read up to `size` bytes from the stream, starting at the current file position."""
+        """Read up to `size` bytes from the stream, starting at the specified file offset.
+
+        This method does not change the file position.
+        """
         return self.arvadosfile.readfrom(offset, size, num_retries)
 
     def flush(self):
@@ -897,8 +983,8 @@ class ArvadosFileWriter(ArvadosFileReader):
 
     """
 
-    def __init__(self, arvadosfile, name, mode, num_retries=None):
-        super(ArvadosFileWriter, self).__init__(arvadosfile, name, mode, num_retries=num_retries)
+    def __init__(self, arvadosfile, mode, num_retries=None):
+        super(ArvadosFileWriter, self).__init__(arvadosfile, mode, num_retries=num_retries)
 
     @_FileLikeObjectBase._before_close
     @retry_method
@@ -908,6 +994,7 @@ class ArvadosFileWriter(ArvadosFileReader):
         else:
             self.arvadosfile.writeto(self._filepos, data, num_retries)
             self._filepos += len(data)
+        return len(data)
 
     @_FileLikeObjectBase._before_close
     @retry_method
index 30828732d8d4908ca0922bc780c11d2a6943578e..eea07179649bb5103cf1c5c4506b8a615d2a25c2 100644 (file)
@@ -488,6 +488,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._modified = True
+        self._callback = None
         self._items = {}
 
     def _my_api(self):
@@ -537,9 +538,9 @@ class RichCollectionBase(CollectionBase):
                 if item is None:
                     # create new file
                     if create_type == COLLECTION:
-                        item = Subcollection(self)
+                        item = Subcollection(self, pathcomponents[0])
                     else:
-                        item = ArvadosFile(self)
+                        item = ArvadosFile(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
@@ -547,14 +548,14 @@ class RichCollectionBase(CollectionBase):
             else:
                 if item is None:
                     # create new collection
-                    item = Subcollection(self)
+                    item = Subcollection(self, pathcomponents[0])
                     self._items[pathcomponents[0]] = item
                     self._modified = True
                     self.notify(ADD, self, pathcomponents[0], item)
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                    raise IOError(errno.ENOTDIR, "Interior path components must be subcollection")
         else:
             return self
 
@@ -580,9 +581,9 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
-                raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
+                raise IOError(errno.ENOTDIR, "Interior path components must be subcollection")
 
-    def mkdirs(path):
+    def mkdirs(self, path):
         """Recursive subcollection create.
 
         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
@@ -615,7 +616,7 @@ class RichCollectionBase(CollectionBase):
         create = (mode != "r")
 
         if create and not self.writable():
-            raise IOError((errno.EROFS, "Collection is read only"))
+            raise IOError(errno.EROFS, "Collection is read only")
 
         if create:
             arvfile = self.find_or_create(path, FILE)
@@ -623,9 +624,9 @@ class RichCollectionBase(CollectionBase):
             arvfile = self.find(path)
 
         if arvfile is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found")
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError((errno.EISDIR, "Path must refer to a file."))
+            raise IOError(errno.EISDIR, "Path must refer to a file.")
 
         if mode[0] == "w":
             arvfile.truncate(0)
@@ -633,16 +634,16 @@ class RichCollectionBase(CollectionBase):
         name = os.path.basename(path)
 
         if mode == "r":
-            return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
         else:
-            return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
+            return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
 
     @synchronized
     def modified(self):
         """Test if the collection (or any subcollection or file) has been modified."""
         if self._modified:
             return True
-        for k,v in self._items.items():
+        for v in self._items.values():
             if v.modified():
                 return True
         return False
@@ -720,10 +721,10 @@ class RichCollectionBase(CollectionBase):
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError((errno.ENOENT, "File not found"))
+            raise IOError(errno.ENOENT, "File not found")
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
+                raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             self._modified = True
@@ -733,15 +734,15 @@ class RichCollectionBase(CollectionBase):
 
     def _clonefrom(self, source):
         for k,v in source.items():
-            self._items[k] = v.clone(self)
+            self._items[k] = v.clone(self, k)
 
     def clone(self):
         raise NotImplementedError()
 
     @must_be_writable
     @synchronized
-    def add(self, source_obj, target_name, overwrite=False):
-        """Copy a file or subcollection to this collection.
+    def add(self, source_obj, target_name, overwrite=False, reparent=False):
+        """Copy or move a file or subcollection to this collection.
 
         :source_obj:
           An ArvadosFile, or Subcollection object
@@ -753,24 +754,74 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
 
+        :reparent:
+          If True, source_obj will be moved from its parent collection to this collection.
+          If False, source_obj will be copied and the parent collection will be
+          unmodified.
+
         """
 
         if target_name in self and not overwrite:
-            raise IOError((errno.EEXIST, "File already exists"))
+            raise IOError(errno.EEXIST, "File already exists")
 
         modified_from = None
         if target_name in self:
             modified_from = self[target_name]
 
-        # Actually make the copy.
-        dup = source_obj.clone(self)
-        self._items[target_name] = dup
+        # Actually make the move or copy.
+        if reparent:
+            source_obj._reparent(self, target_name)
+            item = source_obj
+        else:
+            item = source_obj.clone(self, target_name)
+
+        self._items[target_name] = item
         self._modified = True
 
         if modified_from:
-            self.notify(MOD, self, target_name, (modified_from, dup))
+            self.notify(MOD, self, target_name, (modified_from, item))
+        else:
+            self.notify(ADD, self, target_name, item)
+
+    def _get_src_target(self, source, target_path, source_collection, create_dest):
+        if source_collection is None:
+            source_collection = self
+
+        # Find the object
+        if isinstance(source, basestring):
+            source_obj = source_collection.find(source)
+            if source_obj is None:
+                raise IOError(errno.ENOENT, "File not found")
+            sourcecomponents = source.split("/")
         else:
-            self.notify(ADD, self, target_name, dup)
+            source_obj = source
+            sourcecomponents = None
+
+        # Find parent collection the target path
+        targetcomponents = target_path.split("/")
+
+        # Determine the name to use.
+        target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
+
+        if not target_name:
+            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+
+        if create_dest:
+            target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        else:
+            if len(targetcomponents) > 1:
+                target_dir = self.find("/".join(targetcomponents[0:-1]))
+            else:
+                target_dir = self
+
+        if target_dir is None:
+            raise IOError(errno.ENOENT, "Target directory not found.")
+
+        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
+            target_dir = target_dir[target_name]
+            target_name = sourcecomponents[-1]
+
+        return (source_obj, target_dir, target_name)
 
     @must_be_writable
     @synchronized
@@ -792,35 +843,35 @@ class RichCollectionBase(CollectionBase):
         :overwrite:
           Whether to overwrite target file if it already exists.
         """
-        if source_collection is None:
-            source_collection = self
 
-        # Find the object to copy
-        if isinstance(source, basestring):
-            source_obj = source_collection.find(source)
-            if source_obj is None:
-                raise IOError((errno.ENOENT, "File not found"))
-            sourcecomponents = source.split("/")
-        else:
-            source_obj = source
-            sourcecomponents = None
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
+        target_dir.add(source_obj, target_name, overwrite, False)
 
-        # Find parent collection the target path
-        targetcomponents = target_path.split("/")
+    @must_be_writable
+    @synchronized
+    def rename(self, source, target_path, source_collection=None, overwrite=False):
+        """Move a file or subcollection from `source_collection` to a new path in this collection.
 
-        # Determine the name to use.
-        target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
+        :source:
+          A string with a path to source file or subcollection.
 
-        if not target_name:
-            raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
+        :target_path:
+          Destination file or path.  If the target path already exists and is a
+          subcollection, the item will be placed inside the subcollection.  If
+          the target path already exists and is a file, this will raise an error
+          unless you specify `overwrite=True`.
 
-        target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
+        :source_collection:
+          Collection to copy `source_path` from (default `self`)
 
-        if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
-            target_dir = target_dir[target_name]
-            target_name = sourcecomponents[-1]
+        :overwrite:
+          Whether to overwrite target file if it already exists.
+        """
 
-        target_dir.add(source_obj, target_name, overwrite)
+        source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
+        if not source_obj.writable():
+            raise IOError(errno.EROFS, "Source collection must be writable.")
+        target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
         """Get the manifest text for this collection, sub collections and files.
@@ -917,15 +968,15 @@ class RichCollectionBase(CollectionBase):
             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
         for k in self:
             if k not in end_collection:
-               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
+               changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
         for k in end_collection:
             if k in self:
                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
                 elif end_collection[k] != self[k]:
-                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
+                    changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
             else:
-                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
+                changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
         return changes
 
     @must_be_writable
@@ -937,6 +988,8 @@ class RichCollectionBase(CollectionBase):
         alternate path indicating the conflict.
 
         """
+        if changes:
+            self._modified = True
         for change in changes:
             event_type = change[0]
             path = change[1]
@@ -980,6 +1033,24 @@ class RichCollectionBase(CollectionBase):
         stripped = self.portable_manifest_text()
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
+    @synchronized
+    def subscribe(self, callback):
+        if self._callback is None:
+            self._callback = callback
+        else:
+            raise errors.ArgumentError("A callback is already set on this collection.")
+
+    @synchronized
+    def unsubscribe(self):
+        if self._callback is not None:
+            self._callback = None
+
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+        self.root_collection().notify(event, collection, name, item)
+
     @synchronized
     def __eq__(self, other):
         if other is self:
@@ -998,6 +1069,12 @@ class RichCollectionBase(CollectionBase):
     def __ne__(self, other):
         return not self.__eq__(other)
 
+    @synchronized
+    def flush(self):
+        """Flush bufferblocks to Keep."""
+        for e in self.values():
+            e.flush()
+
 
 class Collection(RichCollectionBase):
     """Represents the root of an Arvados Collection.
@@ -1082,7 +1159,6 @@ class Collection(RichCollectionBase):
         self._api_response = None
 
         self.lock = threading.RLock()
-        self.callbacks = []
         self.events = None
 
         if manifest_locator_or_text:
@@ -1122,6 +1198,7 @@ class Collection(RichCollectionBase):
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
+        self._manifest_text = self.manifest_text()
 
     @synchronized
     def _my_api(self):
@@ -1215,7 +1292,7 @@ class Collection(RichCollectionBase):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block."""
-        if exc_type is not None:
+        if exc_type is None:
             if self.writable() and self._has_collection_uuid():
                 self.save()
         if self._block_manager is not None:
@@ -1236,7 +1313,7 @@ class Collection(RichCollectionBase):
         return self._manifest_locator
 
     @synchronized
-    def clone(self, new_parent=None, readonly=False, new_config=None):
+    def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
         if new_config is None:
             new_config = self._config
         if readonly:
@@ -1362,6 +1439,7 @@ class Collection(RichCollectionBase):
         if create_collection_record:
             if name is None:
                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
+                ensure_unique_name = True
 
             body = {"manifest_text": text,
                     "name": name}
@@ -1378,19 +1456,6 @@ class Collection(RichCollectionBase):
 
         return text
 
-    @synchronized
-    def subscribe(self, callback):
-        self.callbacks.append(callback)
-
-    @synchronized
-    def unsubscribe(self, callback):
-        self.callbacks.remove(callback)
-
-    @synchronized
-    def notify(self, event, collection, name, item):
-        for c in self.callbacks:
-            c(event, collection, name, item)
-
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1420,6 +1485,7 @@ class Collection(RichCollectionBase):
                 segments = []
                 streamoffset = 0L
                 state = BLOCKS
+                self.mkdirs(stream_name)
                 continue
 
             if state == BLOCKS:
@@ -1453,6 +1519,11 @@ class Collection(RichCollectionBase):
 
         self.set_unmodified()
 
+    @synchronized
+    def notify(self, event, collection, name, item):
+        if self._callback:
+            self._callback(event, collection, name, item)
+
 
 class Subcollection(RichCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
@@ -1462,10 +1533,12 @@ class Subcollection(RichCollectionBase):
 
     """
 
-    def __init__(self, parent):
+    def __init__(self, parent, name):
         super(Subcollection, self).__init__(parent)
         self.lock = self.root_collection().lock
         self._manifest_text = None
+        self.name = name
+        self.num_retries = parent.num_retries
 
     def root_collection(self):
         return self.parent.root_collection()
@@ -1482,21 +1555,25 @@ class Subcollection(RichCollectionBase):
     def _my_block_manager(self):
         return self.root_collection()._my_block_manager()
 
-    def notify(self, event, collection, name, item):
-        return self.root_collection().notify(event, collection, name, item)
-
     def stream_name(self):
-        for k, v in self.parent.items():
-            if v is self:
-                return os.path.join(self.parent.stream_name(), k)
-        return '.'
+        return os.path.join(self.parent.stream_name(), self.name)
 
     @synchronized
-    def clone(self, new_parent):
-        c = Subcollection(new_parent)
+    def clone(self, new_parent, new_name):
+        c = Subcollection(new_parent, new_name)
         c._clonefrom(self)
         return c
 
+    @must_be_writable
+    @synchronized
+    def _reparent(self, newparent, newname):
+        self._modified = True
+        self.flush()
+        self.parent.remove(self.name, recursive=True)
+        self.parent = newparent
+        self.name = newname
+        self.lock = self.parent.root_collection().lock
+
 
 class CollectionReader(Collection):
     """A read-only collection object.
index 1701aa43caae5baaa3f25522230c5b9c4ac46994..c5dbb1692aee231a8336f2f3b423e6d1e25bc2f5 100644 (file)
@@ -437,8 +437,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
             blocks[loc] = d
             stream.append(Range(loc, n, len(d)))
             n += len(d)
-        af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
-        return ArvadosFileReader(af, "count.txt")
+        af = ArvadosFile(ArvadosFileReaderTestCase.MockParent(blocks, nocache), "count.txt", stream=stream, segments=[Range(1, 0, 3), Range(6, 3, 3), Range(11, 6, 3)])
+        return ArvadosFileReader(af)
 
     def test_read_block_crossing_behavior(self):
         # read() needs to return all the data requested if possible, even if it
@@ -477,8 +477,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
     def test__eq__from_writes(self):
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
             with Collection() as c2:
-                with c2.open("count1.txt", "w") as f:
-                    f.write("0123456789")
+                f = c2.open("count1.txt", "w")
+                f.write("0123456789")
 
                 self.assertTrue(c1["count1.txt"] == c2["count1.txt"])
                 self.assertFalse(c1["count1.txt"] != c2["count1.txt"])
@@ -486,8 +486,8 @@ class ArvadosFileReaderTestCase(StreamFileReaderTestCase):
     def test__ne__(self):
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt') as c1:
             with Collection() as c2:
-                with c2.open("count1.txt", "w") as f:
-                    f.write("1234567890")
+                f = c2.open("count1.txt", "w")
+                f.write("1234567890")
 
                 self.assertTrue(c1["count1.txt"] != c2["count1.txt"])
                 self.assertFalse(c1["count1.txt"] == c2["count1.txt"])
@@ -510,10 +510,10 @@ class ArvadosFileReadTestCase(unittest.TestCase, StreamRetryTestMixin):
         blockmanager = arvados.arvfile._BlockManager(self.keep_client())
         blockmanager.prefetch_enabled = False
         col = Collection(keep_client=self.keep_client(), block_manager=blockmanager)
-        af = ArvadosFile(col,
+        af = ArvadosFile(col, "test",
                          stream=stream,
                          segments=segments)
-        return ArvadosFileReader(af, "test", **kwargs)
+        return ArvadosFileReader(af, **kwargs)
 
     def read_for_test(self, reader, byte_count, **kwargs):
         return reader.read(byte_count, **kwargs)
@@ -597,7 +597,7 @@ class BlockManagerTest(unittest.TestCase):
         blockmanager = arvados.arvfile._BlockManager(mockkeep)
         bufferblock = blockmanager.alloc_bufferblock()
         bufferblock.owner = mock.MagicMock()
-        bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+        bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
         bufferblock.append("foo")
         blockmanager.commit_all()
         self.assertTrue(bufferblock.owner.flush.called)
@@ -612,7 +612,7 @@ class BlockManagerTest(unittest.TestCase):
         blockmanager = arvados.arvfile._BlockManager(mockkeep)
         bufferblock = blockmanager.alloc_bufferblock()
         bufferblock.owner = mock.MagicMock()
-        bufferblock.owner.flush.side_effect = lambda: blockmanager.commit_bufferblock(bufferblock)
+        bufferblock.owner.flush.side_effect = lambda x: blockmanager.commit_bufferblock(bufferblock, False)
         bufferblock.append("foo")
         with self.assertRaises(arvados.errors.KeepWriteError) as err:
             blockmanager.commit_all()
index 5e1a055d483695c864df6adaf0572368b0f0e165..8bf08d0e1f666c4bde84d248c6dc0ad69bb1576c 100644 (file)
@@ -883,6 +883,24 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         c.copy("count1.txt", "foo/")
         self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n", c.portable_manifest_text())
 
+    def test_rename_file(self):
+        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c.rename("count1.txt", "count2.txt")
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+    def test_move_file_to_dir(self):
+        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c.mkdirs("foo")
+        c.rename("count1.txt", "foo/count2.txt")
+        self.assertEqual("./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c.manifest_text())
+
+    def test_move_file_to_other(self):
+        c1 = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n')
+        c2 = Collection()
+        c2.rename("count1.txt", "count2.txt", source_collection=c1)
+        self.assertEqual("", c1.manifest_text())
+        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n", c2.manifest_text())
+
     def test_clone(self):
         c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt\n./foo 781e5e245d69b566979b86e28d23f2c7+10 0:10:count2.txt\n')
         cl = c.clone()
@@ -982,8 +1000,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         d = c1.diff(c2)
         self.assertEqual(d, [('del', './count1.txt', c1["count1.txt"]),
                              ('add', './count2.txt', c2["count2.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 changed, so it should not be deleted.
         c1.apply(d)
@@ -994,8 +1012,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         c2 = Collection('. 5348b82a029fd9e971a811ce1f71360b+43 0:10:count1.txt')
         d = c1.diff(c2)
         self.assertEqual(d, [('mod', './count1.txt', c1["count1.txt"], c2["count1.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 changed, so c2 mod will go to a conflict file
         c1.apply(d)
@@ -1007,8 +1025,8 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         d = c1.diff(c2)
         self.assertEqual(d, [('del', './count2.txt', c1["count2.txt"]),
                              ('add', './count1.txt', c2["count1.txt"])])
-        with c1.open("count1.txt", "w") as f:
-            f.write("zzzzz")
+        f = c1.open("count1.txt", "w")
+        f.write("zzzzz")
 
         # c1 added count1.txt, so c2 add will go to a conflict file
         c1.apply(d)
index 8d048487928c54f57dd944c083ee76ffb05c01d1..46c5a1b06bd88fdd22ddbe779bd195e2006cfc0f 100644 (file)
@@ -1,6 +1,49 @@
-#
-# FUSE driver for Arvados Keep
-#
+"""FUSE driver for Arvados Keep
+
+Architecture:
+
+There is one `Operations` object per mount point.  It is the entry point for all
+read and write requests from the llfuse module.
+
+The operations object owns an `Inodes` object.  The inodes object stores the
+mapping from numeric inode (used throughout the file system API to uniquely
+identify files) to the Python objects that implement files and directories.
+
+The `Inodes` object owns an `InodeCache` object.  The inode cache records the
+memory footprint of file system objects and when they are last used.  When the
+cache limit is exceeded, the least recently used objects are cleared.
+
+File system objects inherit from `fresh.FreshBase` which manages the object lifecycle.
+
+File objects inherit from `fusefile.File`.  Key methods are `readfrom` and `writeto`
+which implement actual reads and writes.
+
+Directory objects inherit from `fusedir.Directory`.  The directory object wraps
+a Python dict which stores the mapping from filenames to directory entries.
+Directory contents can be accessed through the Python operators such as `[]`
+and `in`.  These methods automatically check if the directory is fresh (up to
+date) or stale (needs update) and will call `update` if necessary before
+returing a result.
+
+The general FUSE operation flow is as follows:
+
+- The request handler is called with either an inode or file handle that is the
+  subject of the operation.
+
+- Look up the inode using the Inodes table or the file handle in the
+  filehandles table to get the file system object.
+
+- For methods that alter files or directories, check that the operation is
+  valid and permitted using _check_writable().
+
+- Call the relevant method on the file system object.
+
+- Return the result.
+
+The FUSE driver supports the Arvados event bus.  When an event is received for
+an object that is live in the inode cache, that object is immediately updated.
+
+"""
 
 import os
 import sys
@@ -22,44 +65,65 @@ import threading
 import itertools
 import ciso8601
 import collections
+import functools
 
-from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory
-from fusefile import StreamReaderFile, StringFile
+from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from fusefile import StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
+log_handler = logging.StreamHandler()
+llogger = logging.getLogger('llfuse')
+llogger.addHandler(log_handler)
+llogger.setLevel(logging.DEBUG)
 
-class FileHandle(object):
-    """Connects a numeric file handle to a File object that has
+class Handle(object):
+    """Connects a numeric file handle to a File or Directory object that has
     been opened by the client."""
 
-    def __init__(self, fh, fileobj):
+    def __init__(self, fh, obj):
         self.fh = fh
-        self.fileobj = fileobj
-        self.fileobj.inc_use()
+        self.obj = obj
+        self.obj.inc_use()
 
     def release(self):
-        self.fileobj.dec_use()
+        self.obj.dec_use()
+
+    def flush(self):
+        return self.obj.flush()
 
 
-class DirectoryHandle(object):
+class FileHandle(Handle):
+    """Connects a numeric file handle to a File  object that has
+    been opened by the client."""
+    pass
+
+
+class DirectoryHandle(Handle):
     """Connects a numeric file handle to a Directory object that has
     been opened by the client."""
 
     def __init__(self, fh, dirobj, entries):
-        self.fh = fh
+        super(DirectoryHandle, self).__init__(fh, dirobj)
         self.entries = entries
-        self.dirobj = dirobj
-        self.dirobj.inc_use()
-
-    def release(self):
-        self.dirobj.dec_use()
 
 
 class InodeCache(object):
+    """Records the memory footprint of objects and when they are last used.
+
+    When the cache limit is exceeded, the least recently used objects are
+    cleared.  Clearing the object means discarding its contents to release
+    memory.  The next time the object is accessed, it must be re-fetched from
+    the server.  Note that the inode cache limit is a soft limit; the cache
+    limit may be exceeded if necessary to load very large objects, it may also
+    be exceeded if open file handles prevent objects from being cleared.
+
+    """
+
     def __init__(self, cap, min_entries=4):
         self._entries = collections.OrderedDict()
-        self._counter = itertools.count(1)
+        self._by_uuid = {}
+        self._counter = itertools.count(0)
         self.cap = cap
         self._total = 0
         self.min_entries = min_entries
@@ -69,15 +133,19 @@ class InodeCache(object):
 
     def _remove(self, obj, clear):
         if clear and not obj.clear():
-            _logger.debug("Could not clear %s in_use %s", obj, obj.in_use())
+            _logger.debug("InodeCache could not clear %i in_use %s", obj.inode, obj.in_use())
             return False
         self._total -= obj.cache_size
         del self._entries[obj.cache_priority]
-        _logger.debug("Cleared %s total now %i", obj, self._total)
+        if obj.cache_uuid:
+            del self._by_uuid[obj.cache_uuid]
+            obj.cache_uuid = None
+        if clear:
+            _logger.debug("InodeCache cleared %i total now %i", obj.inode, self._total)
         return True
 
     def cap_cache(self):
-        _logger.debug("total is %i cap is %i", self._total, self.cap)
+        #_logger.debug("InodeCache total is %i cap is %i", self._total, self.cap)
         if self._total > self.cap:
             for key in list(self._entries.keys()):
                 if self._total < self.cap or len(self._entries) < self.min_entries:
@@ -89,21 +157,28 @@ class InodeCache(object):
             obj.cache_priority = next(self._counter)
             obj.cache_size = obj.objsize()
             self._entries[obj.cache_priority] = obj
+            obj.cache_uuid = obj.uuid()
+            if obj.cache_uuid:
+                self._by_uuid[obj.cache_uuid] = obj
             self._total += obj.objsize()
-            _logger.debug("Managing %s total now %i", obj, self._total)
+            _logger.debug("InodeCache touched %i (size %i) total now %i", obj.inode, obj.objsize(), self._total)
             self.cap_cache()
+        else:
+            obj.cache_priority = None
 
     def touch(self, obj):
         if obj.persisted():
             if obj.cache_priority in self._entries:
                 self._remove(obj, False)
             self.manage(obj)
-            _logger.debug("Touched %s (%i) total now %i", obj, obj.objsize(), self._total)
 
     def unmanage(self, obj):
         if obj.persisted() and obj.cache_priority in self._entries:
             self._remove(obj, True)
 
+    def find(self, uuid):
+        return self._by_uuid.get(uuid)
+
 class Inodes(object):
     """Manage the set of inodes.  This is the mapping from a numeric id
     to a concrete File or Directory object"""
@@ -134,14 +209,40 @@ class Inodes(object):
 
     def add_entry(self, entry):
         entry.inode = next(self._counter)
+        if entry.inode == llfuse.ROOT_INODE:
+            entry.inc_ref()
         self._entries[entry.inode] = entry
         self.inode_cache.manage(entry)
         return entry
 
     def del_entry(self, entry):
-        self.inode_cache.unmanage(entry)
-        llfuse.invalidate_inode(entry.inode)
-        del self._entries[entry.inode]
+        if entry.ref_count == 0:
+            _logger.debug("Deleting inode %i", entry.inode)
+            self.inode_cache.unmanage(entry)
+            llfuse.invalidate_inode(entry.inode)
+            del self._entries[entry.inode]
+            entry.inode = None
+        else:
+            entry.dead = True
+            _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
+
+
+def catch_exceptions(orig_func):
+    """Catch uncaught exceptions and log them consistently."""
+
+    @functools.wraps(orig_func)
+    def catch_exceptions_wrapper(self, *args, **kwargs):
+        try:
+            return orig_func(self, *args, **kwargs)
+        except llfuse.FUSEError:
+            raise
+        except EnvironmentError as e:
+            raise llfuse.FUSEError(e.errno)
+        except:
+            _logger.exception("Unhandled exception during FUSE operation")
+            raise llfuse.FUSEError(errno.EIO)
+
+    return catch_exceptions_wrapper
 
 
 class Operations(llfuse.Operations):
@@ -156,7 +257,7 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, encoding="utf-8", inode_cache=None):
+    def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4):
         super(Operations, self).__init__()
 
         if not inode_cache:
@@ -168,20 +269,56 @@ class Operations(llfuse.Operations):
 
         # dict of inode to filehandle
         self._filehandles = {}
-        self._filehandles_counter = 1
+        self._filehandles_counter = itertools.count(0)
 
         # Other threads that need to wait until the fuse driver
         # is fully initialized should wait() on this event object.
         self.initlock = threading.Event()
 
+        self.num_retries = num_retries
+
+        self.events = None
+
     def init(self):
         # Allow threads that are waiting for the driver to be finished
         # initializing to continue
         self.initlock.set()
 
+    def destroy(self):
+        if self.events:
+            self.events.close()
+
     def access(self, inode, mode, ctx):
         return True
 
+    def listen_for_events(self, api_client):
+        self.event = arvados.events.subscribe(api_client,
+                                 [["event_type", "in", ["create", "update", "delete"]]],
+                                 self.on_event)
+
+    def on_event(self, ev):
+        if 'event_type' in ev:
+            with llfuse.lock:
+                item = self.inodes.inode_cache.find(ev["object_uuid"])
+                if item is not None:
+                    item.invalidate()
+                    if ev["object_kind"] == "arvados#collection":
+                        item.update(to_pdh=ev.get("properties", {}).get("new_attributes", {}).get("portable_data_hash"))
+                    else:
+                        item.update()
+
+                oldowner = ev.get("properties", {}).get("old_attributes", {}).get("owner_uuid")
+                olditemparent = self.inodes.inode_cache.find(oldowner)
+                if olditemparent is not None:
+                    olditemparent.invalidate()
+                    olditemparent.update()
+
+                itemparent = self.inodes.inode_cache.find(ev["object_owner_uuid"])
+                if itemparent is not None:
+                    itemparent.invalidate()
+                    itemparent.update()
+
+    @catch_exceptions
     def getattr(self, inode):
         if inode not in self.inodes:
             raise llfuse.FUSEError(errno.ENOENT)
@@ -197,10 +334,13 @@ class Operations(llfuse.Operations):
         entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
         if isinstance(e, Directory):
             entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
-        elif isinstance(e, StreamReaderFile):
-            entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFREG
         else:
             entry.st_mode |= stat.S_IFREG
+            if isinstance(e, FuseArvadosFile):
+                entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
+
+        if e.writable():
+            entry.st_mode |= stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
 
         entry.st_nlink = 1
         entry.st_uid = self.uid
@@ -217,10 +357,22 @@ class Operations(llfuse.Operations):
 
         return entry
 
+    @catch_exceptions
+    def setattr(self, inode, attr):
+        entry = self.getattr(inode)
+
+        e = self.inodes[inode]
+
+        if attr.st_size is not None and isinstance(e, FuseArvadosFile):
+            with llfuse.lock_released:
+                e.arvfile.truncate(attr.st_size)
+                entry.st_size = e.arvfile.size()
+
+        return entry
+
+    @catch_exceptions
     def lookup(self, parent_inode, name):
         name = unicode(name, self.encoding)
-        _logger.debug("arv-mount lookup: parent_inode %i name %s",
-                      parent_inode, name)
         inode = None
 
         if name == '.':
@@ -234,28 +386,42 @@ class Operations(llfuse.Operations):
                     inode = p[name].inode
 
         if inode != None:
+            _logger.debug("arv-mount lookup: parent_inode %i name '%s' inode %i",
+                      parent_inode, name, inode)
+            self.inodes[inode].inc_ref()
             return self.getattr(inode)
         else:
+            _logger.debug("arv-mount lookup: parent_inode %i name '%s' not found",
+                      parent_inode, name)
             raise llfuse.FUSEError(errno.ENOENT)
 
+    @catch_exceptions
+    def forget(self, inodes):
+        for inode, nlookup in inodes:
+            ent = self.inodes[inode]
+            _logger.debug("arv-mount forget: inode %i nlookup %i ref_count %i", inode, nlookup, ent.ref_count)
+            if ent.dec_ref(nlookup) == 0 and ent.dead:
+                self.inodes.del_entry(ent)
+
+    @catch_exceptions
     def open(self, inode, flags):
         if inode in self.inodes:
             p = self.inodes[inode]
         else:
             raise llfuse.FUSEError(errno.ENOENT)
 
-        if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
-            raise llfuse.FUSEError(errno.EROFS)
-
         if isinstance(p, Directory):
             raise llfuse.FUSEError(errno.EISDIR)
 
-        fh = self._filehandles_counter
-        self._filehandles_counter += 1
+        if ((flags & os.O_WRONLY) or (flags & os.O_RDWR)) and not p.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        fh = next(self._filehandles_counter)
         self._filehandles[fh] = FileHandle(fh, p)
         self.inodes.touch(p)
         return fh
 
+    @catch_exceptions
     def read(self, fh, off, size):
         _logger.debug("arv-mount read %i %i %i", fh, off, size)
         if fh in self._filehandles:
@@ -263,20 +429,38 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        self.inodes.touch(handle.fileobj)
+        self.inodes.touch(handle.obj)
 
         try:
-            with llfuse.lock_released:
-                return handle.fileobj.readfrom(off, size)
+            return handle.obj.readfrom(off, size, self.num_retries)
         except arvados.errors.NotFoundError as e:
-            _logger.warning("Block not found: " + str(e))
-            raise llfuse.FUSEError(errno.EIO)
-        except Exception:
-            _logger.exception()
+            _logger.error("Block not found: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
 
+    @catch_exceptions
+    def write(self, fh, off, buf):
+        _logger.debug("arv-mount write %i %i %i", fh, off, len(buf))
+        if fh in self._filehandles:
+            handle = self._filehandles[fh]
+        else:
+            raise llfuse.FUSEError(errno.EBADF)
+
+        if not handle.obj.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        self.inodes.touch(handle.obj)
+
+        return handle.obj.writeto(off, buf, self.num_retries)
+
+    @catch_exceptions
     def release(self, fh):
         if fh in self._filehandles:
+            try:
+                self._filehandles[fh].flush()
+            except EnvironmentError as e:
+                raise llfuse.FUSEError(e.errno)
+            except Exception:
+                _logger.exception("Flush error")
             self._filehandles[fh].release()
             del self._filehandles[fh]
         self.inodes.inode_cache.cap_cache()
@@ -284,6 +468,7 @@ class Operations(llfuse.Operations):
     def releasedir(self, fh):
         self.release(fh)
 
+    @catch_exceptions
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
 
@@ -295,8 +480,7 @@ class Operations(llfuse.Operations):
         if not isinstance(p, Directory):
             raise llfuse.FUSEError(errno.ENOTDIR)
 
-        fh = self._filehandles_counter
-        self._filehandles_counter += 1
+        fh = next(self._filehandles_counter)
         if p.parent_inode in self.inodes:
             parent = self.inodes[p.parent_inode]
         else:
@@ -308,7 +492,7 @@ class Operations(llfuse.Operations):
         self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
         return fh
 
-
+    @catch_exceptions
     def readdir(self, fh, off):
         _logger.debug("arv-mount readdir: fh %i off %i", fh, off)
 
@@ -317,7 +501,7 @@ class Operations(llfuse.Operations):
         else:
             raise llfuse.FUSEError(errno.EBADF)
 
-        _logger.debug("arv-mount handle.dirobj %s", handle.dirobj)
+        _logger.debug("arv-mount handle.dirobj %s", handle.obj)
 
         e = off
         while e < len(handle.entries):
@@ -328,9 +512,10 @@ class Operations(llfuse.Operations):
                     pass
             e += 1
 
+    @catch_exceptions
     def statfs(self):
         st = llfuse.StatvfsData()
-        st.f_bsize = 64 * 1024
+        st.f_bsize = 128 * 1024
         st.f_blocks = 0
         st.f_files = 0
 
@@ -343,12 +528,73 @@ class Operations(llfuse.Operations):
         st.f_frsize = 0
         return st
 
-    # The llfuse documentation recommends only overloading functions that
-    # are actually implemented, as the default implementation will raise ENOSYS.
-    # However, there is a bug in the llfuse default implementation of create()
-    # "create() takes exactly 5 positional arguments (6 given)" which will crash
-    # arv-mount.
-    # The workaround is to implement it with the proper number of parameters,
-    # and then everything works out.
+    def _check_writable(self, inode_parent):
+        if inode_parent in self.inodes:
+            p = self.inodes[inode_parent]
+        else:
+            raise llfuse.FUSEError(errno.ENOENT)
+
+        if not isinstance(p, Directory):
+            raise llfuse.FUSEError(errno.ENOTDIR)
+
+        if not p.writable():
+            raise llfuse.FUSEError(errno.EPERM)
+
+        return p
+
+    @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx):
-        raise llfuse.FUSEError(errno.EROFS)
+        p = self._check_writable(inode_parent)
+        p.create(name)
+
+        # The file entry should have been implicitly created by callback.
+        f = p[name]
+        fh = next(self._filehandles_counter)
+        self._filehandles[fh] = FileHandle(fh, f)
+        self.inodes.touch(p)
+
+        f.inc_ref()
+        return (fh, self.getattr(f.inode))
+
+    @catch_exceptions
+    def mkdir(self, inode_parent, name, mode, ctx):
+        _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
+
+        p = self._check_writable(inode_parent)
+        p.mkdir(name)
+
+        # The dir entry should have been implicitly created by callback.
+        d = p[name]
+
+        d.inc_ref()
+        return self.getattr(d.inode)
+
+    @catch_exceptions
+    def unlink(self, inode_parent, name):
+        _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
+        p = self._check_writable(inode_parent)
+        p.unlink(name)
+
+    @catch_exceptions
+    def rmdir(self, inode_parent, name):
+        _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
+        p = self._check_writable(inode_parent)
+        p.rmdir(name)
+
+    @catch_exceptions
+    def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
+        _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
+        src = self._check_writable(inode_parent_old)
+        dest = self._check_writable(inode_parent_new)
+        dest.rename(name_old, name_new, src)
+
+    @catch_exceptions
+    def flush(self, fh):
+        if fh in self._filehandles:
+            self._filehandles[fh].flush()
+
+    def fsync(self, fh, datasync):
+        self.flush(fh)
+
+    def fsyncdir(self, fh, datasync):
+        self.flush(fh)
index 5acadfdf7a4fb9b41b6f8c32515a8675cc7e9adf..6ecf35c612906eda1a32b6f62fde76f6fd8eb146 100644 (file)
@@ -22,8 +22,39 @@ def use_counter(orig_func):
             self.dec_use()
     return use_counter_wrapper
 
+def check_update(orig_func):
+    @functools.wraps(orig_func)
+    def check_update_wrapper(self, *args, **kwargs):
+        self.checkupdate()
+        return orig_func(self, *args, **kwargs)
+    return check_update_wrapper
+
 class FreshBase(object):
-    """Base class for maintaining fresh/stale state to determine when to update."""
+    """Base class for maintaining object lifecycle.
+
+    Functions include:
+
+    * Indicate if an object is up to date (stale() == false) or needs to be
+      updated sets stale() == True).  Use invalidate() to mark the object as
+      stale.  An object is also automatically stale if it has not been updated
+      in `_poll_time` seconds.
+
+    * Record access time (atime) timestamp
+
+    * Manage internal use count used by the inode cache ("inc_use" and
+      "dec_use").  An object which is in use cannot be cleared by the inode
+      cache.
+
+    * Manage the kernel reference count ("inc_ref" and "dec_ref").  An object
+      which is referenced by the kernel cannot have its inode entry deleted.
+
+    * Record cache footprint, cache priority
+
+    * Record Arvados uuid at the time the object is placed in the cache
+
+    * Clear the object contents (invalidates the object)
+
+    """
     def __init__(self):
         self._stale = True
         self._poll = False
@@ -31,8 +62,11 @@ class FreshBase(object):
         self._atime = time.time()
         self._poll_time = 60
         self.use_count = 0
-        self.cache_priority = 0
+        self.ref_count = 0
+        self.dead = False
+        self.cache_priority = None
         self.cache_size = 0
+        self.cache_uuid = None
 
     # Mark the value as stale
     def invalidate(self):
@@ -68,5 +102,16 @@ class FreshBase(object):
     def dec_use(self):
         self.use_count -= 1
 
+    def inc_ref(self):
+        self.ref_count += 1
+        return self.ref_count
+
+    def dec_ref(self, n):
+        self.ref_count -= n
+        return self.ref_count
+
     def objsize(self):
         return 0
+
+    def uuid(self):
+        return None
index 2757091aa420ef791e20fd88908c5b2b6270f163..85f4bca833f820835d72057875536a3b53944fdb 100644 (file)
@@ -5,10 +5,14 @@ import llfuse
 import arvados
 import apiclient
 import functools
+import threading
+from apiclient import errors as apiclient_errors
+import errno
 
-from fusefile import StringFile, StreamReaderFile, ObjectFile
-from fresh import FreshBase, convertTime, use_counter
+from fusefile import StringFile, ObjectFile, FuseArvadosFile
+from fresh import FreshBase, convertTime, use_counter, check_update
 
+import arvados.collection
 from arvados.util import portable_data_hash_pattern, uuid_pattern, collection_uuid_pattern, group_uuid_pattern, user_uuid_pattern, link_uuid_pattern
 
 _logger = logging.getLogger('arvados.arvados_fuse')
@@ -41,9 +45,10 @@ class Directory(FreshBase):
     """
 
     def __init__(self, parent_inode, inodes):
+        """parent_inode is the integer inode number"""
+
         super(Directory, self).__init__()
 
-        """parent_inode is the integer inode number"""
         self.inode = None
         if not isinstance(parent_inode, int):
             raise Exception("parent_inode should be an int")
@@ -71,23 +76,28 @@ class Directory(FreshBase):
             try:
                 self.update()
             except apiclient.errors.HttpError as e:
-                _logger.debug(e)
+                _logger.warn(e)
 
     @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         return self._entries[item]
 
     @use_counter
+    @check_update
     def items(self):
-        self.checkupdate()
         return list(self._entries.items())
 
     @use_counter
+    @check_update
     def __contains__(self, k):
-        self.checkupdate()
         return k in self._entries
 
+    @use_counter
+    @check_update
+    def __len__(self):
+        return len(self._entries)
+
     def fresh(self):
         self.inodes.touch(self)
         super(Directory, self).fresh()
@@ -124,6 +134,7 @@ class Directory(FreshBase):
                     self._entries[name] = oldentries[name]
                     del oldentries[name]
                 else:
+                    _logger.debug("Adding entry '%s' to inode %i", name, self.inode)
                     # create new directory entry
                     ent = new_entry(i)
                     if ent is not None:
@@ -132,11 +143,13 @@ class Directory(FreshBase):
 
         # delete any other directory entries that were not in found in 'items'
         for i in oldentries:
+            _logger.debug("Forgetting about entry '%s' on inode %i", str(i), self.inode)
             llfuse.invalidate_entry(self.inode, str(i))
             self.inodes.del_entry(oldentries[i])
             changed = True
 
         if changed:
+            llfuse.invalidate_inode(self.inode)
             self._mtime = time.time()
 
         self.fresh()
@@ -163,27 +176,171 @@ class Directory(FreshBase):
     def mtime(self):
         return self._mtime
 
+    def writable(self):
+        return False
+
+    def flush(self):
+        pass
+
+    def create(self, name):
+        raise NotImplementedError()
+
+    def mkdir(self, name):
+        raise NotImplementedError()
+
+    def unlink(self, name):
+        raise NotImplementedError()
+
+    def rmdir(self, name):
+        raise NotImplementedError()
+
+    def rename(self, name_old, name_new, src):
+        raise NotImplementedError()
+
+
+class CollectionDirectoryBase(Directory):
+    """Represent an Arvados Collection as a directory.
+
+    This class is used for Subcollections, and is also the base class for
+    CollectionDirectory, which implements collection loading/saving on
+    Collection records.
+
+    Most operations act only the underlying Arvados `Collection` object.  The
+    `Collection` object signals via a notify callback to
+    `CollectionDirectoryBase.on_event` that an item was added, removed or
+    modified.  FUSE inodes and directory entries are created, deleted or
+    invalidated in response to these events.
+
+    """
+
+    def __init__(self, parent_inode, inodes, collection):
+        super(CollectionDirectoryBase, self).__init__(parent_inode, inodes)
+        self.collection = collection
+
+    def new_entry(self, name, item, mtime):
+        name = sanitize_filename(name)
+        if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+            if item.fuse_entry.dead is not True:
+                raise Exception("Can only reparent dead inode entry")
+            if item.fuse_entry.inode is None:
+                raise Exception("Reparented entry must still have valid inode")
+            item.fuse_entry.dead = False
+            self._entries[name] = item.fuse_entry
+        elif isinstance(item, arvados.collection.RichCollectionBase):
+            self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, item))
+            self._entries[name].populate(mtime)
+        else:
+            self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
+        item.fuse_entry = self._entries[name]
+
+    def on_event(self, event, collection, name, item):
+        if collection == self.collection:
+            _logger.debug("%s %s %s %s", event, collection, name, item)
+            with llfuse.lock:
+                if event == arvados.collection.ADD:
+                    self.new_entry(name, item, self.mtime())
+                elif event == arvados.collection.DEL:
+                    ent = self._entries[name]
+                    del self._entries[name]
+                    llfuse.invalidate_entry(self.inode, name)
+                    self.inodes.del_entry(ent)
+                elif event == arvados.collection.MOD:
+                    if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
+                        llfuse.invalidate_inode(item.fuse_entry.inode)
+                    elif name in self._entries:
+                        llfuse.invalidate_inode(self._entries[name].inode)
+
+    def populate(self, mtime):
+        self._mtime = mtime
+        self.collection.subscribe(self.on_event)
+        for entry, item in self.collection.items():
+            self.new_entry(entry, item, self.mtime())
+
+    def writable(self):
+        return self.collection.writable()
+
+    @use_counter
+    def flush(self):
+        with llfuse.lock_released:
+            self.collection.root_collection().save()
+
+    @use_counter
+    @check_update
+    def create(self, name):
+        with llfuse.lock_released:
+            self.collection.open(name, "w").close()
+
+    @use_counter
+    @check_update
+    def mkdir(self, name):
+        with llfuse.lock_released:
+            self.collection.mkdirs(name)
+
+    @use_counter
+    @check_update
+    def unlink(self, name):
+        with llfuse.lock_released:
+            self.collection.remove(name)
+        self.flush()
+
+    @use_counter
+    @check_update
+    def rmdir(self, name):
+        with llfuse.lock_released:
+            self.collection.remove(name)
+        self.flush()
 
-class CollectionDirectory(Directory):
-    """Represents the root of a directory tree holding a collection."""
+    @use_counter
+    @check_update
+    def rename(self, name_old, name_new, src):
+        if not isinstance(src, CollectionDirectoryBase):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            ent = src[name_old]
+            tgt = self[name_new]
+            if isinstance(ent, FuseArvadosFile) and isinstance(tgt, FuseArvadosFile):
+                pass
+            elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, CollectionDirectoryBase):
+                if len(tgt) > 0:
+                    raise llfuse.FUSEError(errno.ENOTEMPTY)
+            elif isinstance(ent, CollectionDirectoryBase) and isinstance(tgt, FuseArvadosFile):
+                raise llfuse.FUSEError(errno.ENOTDIR)
+            elif isinstance(ent, FuseArvadosFile) and isinstance(tgt, CollectionDirectoryBase):
+                raise llfuse.FUSEError(errno.EISDIR)
 
-    def __init__(self, parent_inode, inodes, api, num_retries, collection):
-        super(CollectionDirectory, self).__init__(parent_inode, inodes)
+        with llfuse.lock_released:
+            self.collection.rename(name_old, name_new, source_collection=src.collection, overwrite=True)
+        self.flush()
+        src.flush()
+
+
+class CollectionDirectory(CollectionDirectoryBase):
+    """Represents the root of a directory tree representing a collection."""
+
+    def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
+        super(CollectionDirectory, self).__init__(parent_inode, inodes, None)
         self.api = api
         self.num_retries = num_retries
-        self.collection_object_file = None
-        self.collection_object = None
-        if isinstance(collection, dict):
-            self.collection_locator = collection['uuid']
-            self._mtime = convertTime(collection.get('modified_at'))
+        self.collection_record_file = None
+        self.collection_record = None
+        if isinstance(collection_record, dict):
+            self.collection_locator = collection_record['uuid']
+            self._mtime = convertTime(collection_record.get('modified_at'))
         else:
-            self.collection_locator = collection
+            self.collection_locator = collection_record
             self._mtime = 0
         self._manifest_size = 0
+        if self.collection_locator:
+            self._writable = (uuid_pattern.match(self.collection_locator) is not None)
+        self._updating_lock = threading.Lock()
 
     def same(self, i):
         return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
 
+    def writable(self):
+        return self.collection.writable() if self.collection is not None else self._writable
+
     # Used by arv-web.py to switch the contents of the CollectionDirectory
     def change_collection(self, new_locator):
         """Switch the contents of the CollectionDirectory.
@@ -192,82 +349,99 @@ class CollectionDirectory(Directory):
         """
 
         self.collection_locator = new_locator
-        self.collection_object = None
+        self.collection_record = None
         self.update()
 
-    def new_collection(self, new_collection_object, coll_reader):
-        self.clear(force=True)
+    def new_collection(self, new_collection_record, coll_reader):
+        if self.inode:
+            self.clear(force=True)
 
-        self.collection_object = new_collection_object
+        self.collection_record = new_collection_record
 
-        self._mtime = convertTime(self.collection_object.get('modified_at'))
+        if self.collection_record:
+            self._mtime = convertTime(self.collection_record.get('modified_at'))
+            self.collection_locator = self.collection_record["uuid"]
+            if self.collection_record_file is not None:
+                self.collection_record_file.update(self.collection_record)
 
-        if self.collection_object_file is not None:
-            self.collection_object_file.update(self.collection_object)
+        self.collection = coll_reader
+        self.populate(self.mtime())
 
-        for s in coll_reader.all_streams():
-            cwd = self
-            for part in s.name().split('/'):
-                if part != '' and part != '.':
-                    partname = sanitize_filename(part)
-                    if partname not in cwd._entries:
-                        cwd._entries[partname] = self.inodes.add_entry(Directory(cwd.inode, self.inodes))
-                    cwd = cwd._entries[partname]
-            for k, v in s.files().items():
-                cwd._entries[sanitize_filename(k)] = self.inodes.add_entry(StreamReaderFile(cwd.inode, v, self.mtime()))
+    def uuid(self):
+        return self.collection_locator
 
-    def update(self):
+    @use_counter
+    def update(self, to_pdh=None):
         try:
-            if self.collection_object is not None and portable_data_hash_pattern.match(self.collection_locator):
+            if self.collection_record is not None and portable_data_hash_pattern.match(self.collection_locator):
                 return True
 
             if self.collection_locator is None:
                 self.fresh()
                 return True
 
-            with llfuse.lock_released:
-                coll_reader = arvados.CollectionReader(
-                    self.collection_locator, self.api, self.api.keep,
-                    num_retries=self.num_retries)
-                new_collection_object = coll_reader.api_response() or {}
-                # If the Collection only exists in Keep, there will be no API
-                # response.  Fill in the fields we need.
-                if 'uuid' not in new_collection_object:
-                    new_collection_object['uuid'] = self.collection_locator
-                if "portable_data_hash" not in new_collection_object:
-                    new_collection_object["portable_data_hash"] = new_collection_object["uuid"]
-                if 'manifest_text' not in new_collection_object:
-                    new_collection_object['manifest_text'] = coll_reader.manifest_text()
-                coll_reader.normalize()
-            # end with llfuse.lock_released, re-acquire lock
-
-            if self.collection_object is None or self.collection_object["portable_data_hash"] != new_collection_object["portable_data_hash"]:
-                self.new_collection(new_collection_object, coll_reader)
-
-            self._manifest_size = len(coll_reader.manifest_text())
-            _logger.debug("%s manifest_size %i", self, self._manifest_size)
+            try:
+                with llfuse.lock_released:
+                    self._updating_lock.acquire()
+                    if not self.stale():
+                        return
+
+                    _logger.debug("Updating %s", self.collection_locator)
+                    if self.collection:
+                        if self.collection.portable_data_hash() == to_pdh:
+                            _logger.debug("%s is fresh at pdh '%s'", self.collection_locator, to_pdh)
+                        else:
+                            self.collection.update()
+                    else:
+                        if uuid_pattern.match(self.collection_locator):
+                            coll_reader = arvados.collection.Collection(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        else:
+                            coll_reader = arvados.collection.CollectionReader(
+                                self.collection_locator, self.api, self.api.keep,
+                                num_retries=self.num_retries)
+                        new_collection_record = coll_reader.api_response() or {}
+                        # If the Collection only exists in Keep, there will be no API
+                        # response.  Fill in the fields we need.
+                        if 'uuid' not in new_collection_record:
+                            new_collection_record['uuid'] = self.collection_locator
+                        if "portable_data_hash" not in new_collection_record:
+                            new_collection_record["portable_data_hash"] = new_collection_record["uuid"]
+                        if 'manifest_text' not in new_collection_record:
+                            new_collection_record['manifest_text'] = coll_reader.manifest_text()
+
+                        if self.collection_record is None or self.collection_record["portable_data_hash"] != new_collection_record.get("portable_data_hash"):
+                            self.new_collection(new_collection_record, coll_reader)
+
+                        self._manifest_size = len(coll_reader.manifest_text())
+                        _logger.debug("%s manifest_size %i", self, self._manifest_size)
+                # end with llfuse.lock_released, re-acquire lock
 
-            self.fresh()
-            return True
+                self.fresh()
+                return True
+            finally:
+                self._updating_lock.release()
         except arvados.errors.NotFoundError:
             _logger.exception("arv-mount %s: error", self.collection_locator)
         except arvados.errors.ArgumentError as detail:
             _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.warning("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         except Exception:
             _logger.exception("arv-mount %s: error", self.collection_locator)
-            if self.collection_object is not None and "manifest_text" in self.collection_object:
-                _logger.error("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
+            if self.collection_record is not None and "manifest_text" in self.collection_record:
+                _logger.error("arv-mount manifest_text is: %s", self.collection_record["manifest_text"])
         return False
 
+    @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         if item == '.arvados#collection':
-            if self.collection_object_file is None:
-                self.collection_object_file = ObjectFile(self.inode, self.collection_object)
-                self.inodes.add_entry(self.collection_object_file)
-            return self.collection_object_file
+            if self.collection_record_file is None:
+                self.collection_record_file = ObjectFile(self.inode, self.collection_record)
+                self.inodes.add_entry(self.collection_record_file)
+            return self.collection_record_file
         else:
             return super(CollectionDirectory, self).__getitem__(item)
 
@@ -278,8 +452,8 @@ class CollectionDirectory(Directory):
             return super(CollectionDirectory, self).__contains__(k)
 
     def invalidate(self):
-        self.collection_object = None
-        self.collection_object_file = None
+        self.collection_record = None
+        self.collection_record_file = None
         super(CollectionDirectory, self).invalidate()
 
     def persisted(self):
@@ -291,6 +465,7 @@ class CollectionDirectory(Directory):
         # footprint directly would be more accurate, but also more complicated.
         return self._manifest_size * 128
 
+
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
 
@@ -380,6 +555,7 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self._poll = True
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             tags = self.api.links().list(
@@ -407,6 +583,7 @@ class TagDirectory(Directory):
         self._poll = poll
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             taggedcollections = self.api.links().list(
@@ -431,9 +608,11 @@ class ProjectDirectory(Directory):
         self.num_retries = num_retries
         self.project_object = project_object
         self.project_object_file = None
-        self.uuid = project_object['uuid']
+        self.project_uuid = project_object['uuid']
         self._poll = poll
         self._poll_time = poll_time
+        self._updating_lock = threading.Lock()
+        self._current_user = None
 
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
@@ -450,6 +629,10 @@ class ProjectDirectory(Directory):
         else:
             return None
 
+    def uuid(self):
+        return self.project_uuid
+
+    @use_counter
     def update(self):
         if self.project_object_file == None:
             self.project_object_file = ObjectFile(self.inode, self.project_object)
@@ -472,34 +655,40 @@ class ProjectDirectory(Directory):
                 return None
 
         def samefn(a, i):
-            if isinstance(a, CollectionDirectory):
-                return a.collection_locator == i['uuid']
-            elif isinstance(a, ProjectDirectory):
-                return a.uuid == i['uuid']
+            if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
+                return a.uuid() == i['uuid']
             elif isinstance(a, ObjectFile):
-                return a.uuid == i['uuid'] and not a.stale()
+                return a.uuid() == i['uuid'] and not a.stale()
             return False
 
-        with llfuse.lock_released:
-            if group_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.groups().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
-            elif user_uuid_pattern.match(self.uuid):
-                self.project_object = self.api.users().get(
-                    uuid=self.uuid).execute(num_retries=self.num_retries)
+        try:
+            with llfuse.lock_released:
+                self._updating_lock.acquire()
+                if not self.stale():
+                    return
 
-            contents = arvados.util.list_all(self.api.groups().contents,
-                                             self.num_retries, uuid=self.uuid)
+                if group_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.groups().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
+                elif user_uuid_pattern.match(self.project_uuid):
+                    self.project_object = self.api.users().get(
+                        uuid=self.project_uuid).execute(num_retries=self.num_retries)
 
-        # end with llfuse.lock_released, re-acquire lock
+                contents = arvados.util.list_all(self.api.groups().contents,
+                                                 self.num_retries, uuid=self.project_uuid)
 
-        self.merge(contents,
-                   namefn,
-                   samefn,
-                   self.createDirectory)
+            # end with llfuse.lock_released, re-acquire lock
 
+            self.merge(contents,
+                       namefn,
+                       samefn,
+                       self.createDirectory)
+        finally:
+            self._updating_lock.release()
+
+    @use_counter
+    @check_update
     def __getitem__(self, item):
-        self.checkupdate()
         if item == '.arvados#project':
             return self.project_object_file
         else:
@@ -511,8 +700,69 @@ class ProjectDirectory(Directory):
         else:
             return super(ProjectDirectory, self).__contains__(k)
 
+    @use_counter
+    @check_update
+    def writable(self):
+        with llfuse.lock_released:
+            if not self._current_user:
+                self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
+            return self._current_user["uuid"] in self.project_object["writable_by"]
+
     def persisted(self):
-        return False
+        return True
+
+    @use_counter
+    @check_update
+    def mkdir(self, name):
+        try:
+            with llfuse.lock_released:
+                self.api.collections().create(body={"owner_uuid": self.project_uuid,
+                                                    "name": name,
+                                                    "manifest_text": ""}).execute(num_retries=self.num_retries)
+            self.invalidate()
+        except apiclient_errors.Error as error:
+            _logger.error(error)
+            raise llfuse.FUSEError(errno.EEXIST)
+
+    @use_counter
+    @check_update
+    def rmdir(self, name):
+        if name not in self:
+            raise llfuse.FUSEError(errno.ENOENT)
+        if not isinstance(self[name], CollectionDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+        if len(self[name]) > 0:
+            raise llfuse.FUSEError(errno.ENOTEMPTY)
+        with llfuse.lock_released:
+            self.api.collections().delete(uuid=self[name].uuid()).execute(num_retries=self.num_retries)
+        self.invalidate()
+
+    @use_counter
+    @check_update
+    def rename(self, name_old, name_new, src):
+        if not isinstance(src, ProjectDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        ent = src[name_old]
+
+        if not isinstance(ent, CollectionDirectory):
+            raise llfuse.FUSEError(errno.EPERM)
+
+        if name_new in self:
+            # POSIX semantics for replacing one directory with another is
+            # tricky (the target directory must be empty, the operation must be
+            # atomic which isn't possible with the Arvados API as of this
+            # writing) so don't support that.
+            raise llfuse.FUSEError(errno.EPERM)
+
+        self.api.collections().update(uuid=ent.uuid(),
+                                      body={"owner_uuid": self.uuid(),
+                                            "name": name_new}).execute(num_retries=self.num_retries)
+
+        # Acually move the entry from source directory to this directory.
+        del src._entries[name_old]
+        self._entries[name_new] = ent
+        llfuse.invalidate_entry(src.inode, name_old)
 
 
 class SharedDirectory(Directory):
@@ -527,6 +777,7 @@ class SharedDirectory(Directory):
         self._poll = True
         self._poll_time = poll_time
 
+    @use_counter
     def update(self):
         with llfuse.lock_released:
             all_projects = arvados.util.list_all(
@@ -562,11 +813,14 @@ class SharedDirectory(Directory):
             for r in root_owners:
                 if r in objects:
                     obr = objects[r]
-                    if "name" in obr:
+                    if obr.get("name"):
                         contents[obr["name"]] = obr
-                    if "first_name" in obr:
+                    #elif obr.get("username"):
+                    #    contents[obr["username"]] = obr
+                    elif "first_name" in obr:
                         contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
 
+
             for r in roots:
                 if r['owner_uuid'] not in objects:
                     contents[r['name']] = r
@@ -576,7 +830,7 @@ class SharedDirectory(Directory):
         try:
             self.merge(contents.items(),
                        lambda i: i[0],
-                       lambda a, i: a.uuid == i[1]['uuid'],
+                       lambda a, i: a.uuid() == i[1]['uuid'],
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()
index efe31c387c09432f3905144c19b5417161d6a1a7..d33f9f9e41655eede0e794027de9d8d4546248b4 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 import re
 import json
+import llfuse
 
 from fresh import FreshBase, convertTime
 
@@ -18,32 +19,54 @@ class File(FreshBase):
     def size(self):
         return 0
 
-    def readfrom(self, off, size):
+    def readfrom(self, off, size, num_retries=0):
         return ''
 
+    def writeto(self, off, size, num_retries=0):
+        raise Exception("Not writable")
+
     def mtime(self):
         return self._mtime
 
     def clear(self, force=False):
         return True
 
+    def writable(self):
+        return False
+
+    def flush(self):
+        pass
 
-class StreamReaderFile(File):
-    """Wraps a StreamFileReader as a file."""
 
-    def __init__(self, parent_inode, reader, _mtime):
-        super(StreamReaderFile, self).__init__(parent_inode, _mtime)
-        self.reader = reader
+class FuseArvadosFile(File):
+    """Wraps a ArvadosFile."""
+
+    def __init__(self, parent_inode, arvfile, _mtime):
+        super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
+        self.arvfile = arvfile
 
     def size(self):
-        return self.reader.size()
+        return self.arvfile.size()
+
+    def readfrom(self, off, size, num_retries=0):
+        with llfuse.lock_released:
+            return self.arvfile.readfrom(off, size, num_retries, exact=True)
 
-    def readfrom(self, off, size):
-        return self.reader.readfrom(off, size)
+    def writeto(self, off, buf, num_retries=0):
+        with llfuse.lock_released:
+            return self.arvfile.writeto(off, buf, num_retries)
 
     def stale(self):
         return False
 
+    def writable(self):
+        return self.arvfile.writable()
+
+    def flush(self):
+        with llfuse.lock_released:
+            if self.writable():
+                self.arvfile.parent.root_collection().save()
+
 
 class StringFile(File):
     """Wrap a simple string as a file"""
@@ -54,7 +77,7 @@ class StringFile(File):
     def size(self):
         return len(self.contents)
 
-    def readfrom(self, off, size):
+    def readfrom(self, off, size, num_retries=0):
         return self.contents[off:(off+size)]
 
 
@@ -63,9 +86,15 @@ class ObjectFile(StringFile):
 
     def __init__(self, parent_inode, obj):
         super(ObjectFile, self).__init__(parent_inode, "", 0)
-        self.uuid = obj['uuid']
+        self.object_uuid = obj['uuid']
         self.update(obj)
 
-    def update(self, obj):
+    def uuid(self):
+        return self.object_uuid
+
+    def update(self, obj=None):
         self._mtime = convertTime(obj['modified_at']) if 'modified_at' in obj else 0
         self.contents = json.dumps(obj, indent=4, sort_keys=True) + "\n"
+
+    def persisted(self):
+        return True
index 5a3e0215388a3077b78a117e9064ff0acc8984a9..c3f4ab01ffb46abea8fb07419835a8eda91366dc 100755 (executable)
@@ -156,10 +156,16 @@ From here, the following directories are available:
     opts = [optname for optname in ['allow_other', 'debug']
             if getattr(args, optname)]
 
+    # Increase default read/write size from 4KiB to 128KiB
+    opts += ["big_writes", "max_read=131072"]
+
     if args.exec_args:
         # Initialize the fuse connection
         llfuse.init(operations, args.mountpoint, opts)
 
+        # Subscribe to change events from API server
+        operations.listen_for_events(api)
+
         t = threading.Thread(None, lambda: llfuse.main())
         t.start()
 
@@ -196,6 +202,10 @@ From here, the following directories are available:
     else:
         try:
             llfuse.init(operations, args.mountpoint, opts)
+
+            # Subscribe to change events from API server
+            operations.listen_for_events(api)
+
             llfuse.main()
         except Exception as e:
             logger.exception('arv-mount: exception during mount')
index 0ef3ea67df379806a56820ba273b482d61d0281b..34b5fc260d2d7e88747a6534ad5fd52ffd8b6950 100644 (file)
@@ -29,7 +29,7 @@ setup(name='arvados_fuse',
         'bin/arv-mount'
         ],
       install_requires=[
-        'arvados-python-client>=0.1.20150303143450',
+        'arvados-python-client',
         'llfuse',
         'python-daemon',
         'ciso8601'
index 61a365f8b06c9c7d5294d4662aa09aa118458191..cce5b95a80ed5c5ab0dfc6819ff96bb661f6387d 100644 (file)
@@ -140,6 +140,7 @@ class InodeTests(unittest.TestCase):
         # Delete ent1
         self.assertEqual(500, cache.total())
         ent1.clear.return_value = True
+        ent1.ref_count = 0
         inodes.del_entry(ent1)
         self.assertEqual(0, cache.total())
         cache.touch(ent3)
index 5535494a67b77089d233544e2c2c1e219eedc622..82d8ec78627a24bc2edaa6984bc4d753921f8128 100644 (file)
@@ -12,11 +12,23 @@ import tempfile
 import threading
 import time
 import unittest
-
+import logging
+import multiprocessing
 import run_test_server
 
+logger = logging.getLogger('arvados.arv-mount')
+
 class MountTestBase(unittest.TestCase):
     def setUp(self):
+        # The underlying C implementation of open() makes a fstat() syscall
+        # with the GIL still held.  When the GETATTR message comes back to
+        # llfuse (which in these tests is in the same interpreter process) it
+        # can't acquire the GIL, so it can't service the fstat() call, so it
+        # deadlocks.  The workaround is to run some of our test code in a
+        # separate process.  Forturnately the multiprocessing module makes this
+        # relatively easy.
+        self.pool = multiprocessing.Pool(1)
+
         self.keeptmp = tempfile.mkdtemp()
         os.environ['KEEP_LOCAL_STORE'] = self.keeptmp
         self.mounttmp = tempfile.mkdtemp()
@@ -25,15 +37,19 @@ class MountTestBase(unittest.TestCase):
         self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def make_mount(self, root_class, **root_kwargs):
-        operations = fuse.Operations(os.getuid(), os.getgid())
-        operations.inodes.add_entry(root_class(
-            llfuse.ROOT_INODE, operations.inodes, self.api, 0, **root_kwargs))
-        llfuse.init(operations, self.mounttmp, [])
+        self.operations = fuse.Operations(os.getuid(), os.getgid())
+        self.operations.inodes.add_entry(root_class(
+            llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
+        llfuse.init(self.operations, self.mounttmp, [])
         threading.Thread(None, llfuse.main).start()
         # wait until the driver is finished initializing
-        operations.initlock.wait()
+        self.operations.initlock.wait()
+        return self.operations.inodes[llfuse.ROOT_INODE]
 
     def tearDown(self):
+        self.pool.close()
+        self.pool = None
+
         # llfuse.close is buggy, so use fusermount instead.
         #llfuse.close(unmount=True)
         count = 0
@@ -51,7 +67,7 @@ class MountTestBase(unittest.TestCase):
         path = self.mounttmp
         if subdir:
             path = os.path.join(path, subdir)
-        self.assertEqual(sorted(expect_content), sorted(os.listdir(path)))
+        self.assertEqual(sorted(expect_content), sorted(llfuse.listdir(path)))
 
 
 class FuseMountTest(MountTestBase):
@@ -98,7 +114,7 @@ class FuseMountTest(MountTestBase):
         self.api.collections().create(body={"manifest_text":cw.manifest_text()}).execute()
 
     def runTest(self):
-        self.make_mount(fuse.CollectionDirectory, collection=self.testcollection)
+        self.make_mount(fuse.CollectionDirectory, collection_record=self.testcollection)
 
         self.assertDirContents(None, ['thing1.txt', 'thing2.txt',
                                       'edgecases', 'dir1', 'dir2'])
@@ -157,7 +173,7 @@ class FuseMagicTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.MagicDirectory)
 
-        mount_ls = os.listdir(self.mounttmp)
+        mount_ls = llfuse.listdir(self.mounttmp)
         self.assertIn('README', mount_ls)
         self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
                              arvados.util.uuid_pattern.match(fn)
@@ -166,11 +182,11 @@ class FuseMagicTest(MountTestBase):
         self.assertDirContents(self.testcollection, ['thing1.txt'])
         self.assertDirContents(os.path.join('by_id', self.testcollection),
                                ['thing1.txt'])
-        mount_ls = os.listdir(self.mounttmp)
+        mount_ls = llfuse.listdir(self.mounttmp)
         self.assertIn('README', mount_ls)
         self.assertIn(self.testcollection, mount_ls)
         self.assertIn(self.testcollection,
-                      os.listdir(os.path.join(self.mounttmp, 'by_id')))
+                      llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
 
         files = {}
         files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
@@ -184,15 +200,15 @@ class FuseTagsTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.TagsDirectory)
 
-        d1 = os.listdir(self.mounttmp)
+        d1 = llfuse.listdir(self.mounttmp)
         d1.sort()
         self.assertEqual(['foo_tag'], d1)
 
-        d2 = os.listdir(os.path.join(self.mounttmp, 'foo_tag'))
+        d2 = llfuse.listdir(os.path.join(self.mounttmp, 'foo_tag'))
         d2.sort()
         self.assertEqual(['zzzzz-4zz18-fy296fx3hot09f7'], d2)
 
-        d3 = os.listdir(os.path.join(self.mounttmp, 'foo_tag', 'zzzzz-4zz18-fy296fx3hot09f7'))
+        d3 = llfuse.listdir(os.path.join(self.mounttmp, 'foo_tag', 'zzzzz-4zz18-fy296fx3hot09f7'))
         d3.sort()
         self.assertEqual(['foo'], d3)
 
@@ -208,12 +224,12 @@ class FuseTagsUpdateTest(MountTestBase):
     def runTest(self):
         self.make_mount(fuse.TagsDirectory, poll_time=1)
 
-        self.assertIn('foo_tag', os.listdir(self.mounttmp))
+        self.assertIn('foo_tag', llfuse.listdir(self.mounttmp))
 
         bar_uuid = run_test_server.fixture('collections')['bar_file']['uuid']
         self.tag_collection(bar_uuid, 'fuse_test_tag')
         time.sleep(1)
-        self.assertIn('fuse_test_tag', os.listdir(self.mounttmp))
+        self.assertIn('fuse_test_tag', llfuse.listdir(self.mounttmp))
         self.assertDirContents('fuse_test_tag', [bar_uuid])
 
         baz_uuid = run_test_server.fixture('collections')['baz_file']['uuid']
@@ -234,14 +250,14 @@ class FuseSharedTest(MountTestBase):
         # shared_dirs is a list of the directories exposed
         # by fuse.SharedDirectory (i.e. any object visible
         # to the current user)
-        shared_dirs = os.listdir(self.mounttmp)
+        shared_dirs = llfuse.listdir(self.mounttmp)
         shared_dirs.sort()
         self.assertIn('FUSE User', shared_dirs)
 
         # fuse_user_objs is a list of the objects owned by the FUSE
         # test user (which present as files in the 'FUSE User'
         # directory)
-        fuse_user_objs = os.listdir(os.path.join(self.mounttmp, 'FUSE User'))
+        fuse_user_objs = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User'))
         fuse_user_objs.sort()
         self.assertEqual(['FUSE Test Project',                    # project owned by user
                           'collection #1 owned by FUSE',          # collection owned by user
@@ -250,7 +266,7 @@ class FuseSharedTest(MountTestBase):
                       ], fuse_user_objs)
 
         # test_proj_files is a list of the files in the FUSE Test Project.
-        test_proj_files = os.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
+        test_proj_files = llfuse.listdir(os.path.join(self.mounttmp, 'FUSE User', 'FUSE Test Project'))
         test_proj_files.sort()
         self.assertEqual(['collection in FUSE project',
                           'pipeline instance in FUSE project.pipelineInstance',
@@ -285,10 +301,10 @@ class FuseHomeTest(MountTestBase):
         self.make_mount(fuse.ProjectDirectory,
                         project_object=self.api.users().current().execute())
 
-        d1 = os.listdir(self.mounttmp)
+        d1 = llfuse.listdir(self.mounttmp)
         self.assertIn('Unrestricted public data', d1)
 
-        d2 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
+        d2 = llfuse.listdir(os.path.join(self.mounttmp, 'Unrestricted public data'))
         public_project = run_test_server.fixture('groups')[
             'anonymously_accessible_project']
         found_in = 0
@@ -308,10 +324,722 @@ class FuseHomeTest(MountTestBase):
         self.assertNotEqual(0, found_in)
         self.assertNotEqual(0, found_not_in)
 
-        d3 = os.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
+        d3 = llfuse.listdir(os.path.join(self.mounttmp, 'Unrestricted public data', 'GNU General Public License, version 3'))
         self.assertEqual(["GNU_General_Public_License,_version_3.pdf"], d3)
 
 
+def fuseModifyFileTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            d1 = llfuse.listdir(mounttmp)
+            self.assertEqual(["file1.txt"], d1)
+            with open(os.path.join(mounttmp, "file1.txt")) as f:
+                self.assertEqual("blub", f.read())
+    Test().runTest()
+
+def fuseModifyFileTestHelper2(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            d1 = llfuse.listdir(mounttmp)
+            self.assertEqual(["file1.txt"], d1)
+            with open(os.path.join(mounttmp, "file1.txt")) as f:
+                self.assertEqual("plnp", f.read())
+    Test().runTest()
+
+class FuseModifyFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        with collection.open("file1.txt", "w") as f:
+            f.write("blub")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        self.pool.apply(fuseModifyFileTestHelper1, (self.mounttmp,))
+
+        with collection.open("file1.txt", "w") as f:
+            f.write("plnp")
+
+        self.pool.apply(fuseModifyFileTestHelper2, (self.mounttmp,))
+
+
+class FuseAddFileToCollectionTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        with collection.open("file1.txt", "w") as f:
+            f.write("blub")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+
+        with collection.open("file2.txt", "w") as f:
+            f.write("plnp")
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
+
+
+class FuseRemoveFileFromCollectionTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        with collection.open("file1.txt", "w") as f:
+            f.write("blub")
+
+        with collection.open("file2.txt", "w") as f:
+            f.write("plnp")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt", "file2.txt"], sorted(d1))
+
+        collection.remove("file2.txt")
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+
+
+def fuseCreateFileTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+                pass
+    Test().runTest()
+
+class FuseCreateFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertEqual(collection2["manifest_text"], "")
+
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.assertNotIn("file1.txt", collection)
+
+        self.pool.apply(fuseCreateFileTestHelper, (self.mounttmp,))
+
+        self.assertIn("file1.txt", collection)
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertEqual(["file1.txt"], d1)
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. d41d8cd98f00b204e9800998ecf8427e\+0\+A[a-f0-9]{40}@[a-f0-9]{8} 0:0:file1\.txt$')
+
+
+def fuseWriteFileTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+                f.write("Hello world!")
+    Test().runTest()
+
+def fuseWriteFileTestHelper2(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
+                self.assertEqual(f.read(), "Hello world!")
+    Test().runTest()
+
+class FuseWriteFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.assertNotIn("file1.txt", collection)
+
+        self.pool.apply(fuseWriteFileTestHelper1, (self.mounttmp,))
+
+        with collection.open("file1.txt") as f:
+            self.assertEqual(f.read(), "Hello world!")
+
+        self.pool.apply(fuseWriteFileTestHelper2, (self.mounttmp,))
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+
+def fuseUpdateFileTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+            with open(os.path.join(mounttmp, "file1.txt"), "r+") as f:
+                fr = f.read()
+                self.assertEqual(fr, "Hello world!")
+                f.seek(0)
+                f.write("Hola mundo!")
+                f.seek(0)
+                fr = f.read()
+                self.assertEqual(fr, "Hola mundo!!")
+
+            with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
+                self.assertEqual(f.read(), "Hola mundo!!")
+
+    Test().runTest()
+
+class FuseUpdateFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        # See note in FuseWriteFileTest
+        self.pool.apply(fuseUpdateFileTestHelper, (self.mounttmp,))
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. daaef200ebb921e011e3ae922dd3266b\+11\+A[a-f0-9]{40}@[a-f0-9]{8} 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:11:file1\.txt 22:1:file1\.txt$')
+
+
+def fuseMkdirTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with self.assertRaises(IOError):
+                with open(os.path.join(mounttmp, "testdir", "file1.txt"), "w") as f:
+                    f.write("Hello world!")
+
+            os.mkdir(os.path.join(mounttmp, "testdir"))
+
+            with self.assertRaises(OSError):
+                os.mkdir(os.path.join(mounttmp, "testdir"))
+
+            d1 = llfuse.listdir(mounttmp)
+            self.assertEqual(["testdir"], d1)
+
+            with open(os.path.join(mounttmp, "testdir", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+            d1 = llfuse.listdir(os.path.join(mounttmp, "testdir"))
+            self.assertEqual(["file1.txt"], d1)
+
+    Test().runTest()
+
+class FuseMkdirTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.pool.apply(fuseMkdirTestHelper, (self.mounttmp,))
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+
+def fuseRmTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.mkdir(os.path.join(mounttmp, "testdir"))
+
+            with open(os.path.join(mounttmp, "testdir", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+    Test().runTest()
+
+def fuseRmTestHelper2(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            # Can't delete because it's not empty
+            with self.assertRaises(OSError):
+                os.rmdir(os.path.join(mounttmp, "testdir"))
+
+            d1 = llfuse.listdir(os.path.join(mounttmp, "testdir"))
+            self.assertEqual(["file1.txt"], d1)
+
+            # Delete file
+            os.remove(os.path.join(mounttmp, "testdir", "file1.txt"))
+
+            # Make sure it's empty
+            d1 = llfuse.listdir(os.path.join(mounttmp, "testdir"))
+            self.assertEqual([], d1)
+
+            # Try to delete it again
+            with self.assertRaises(OSError):
+                os.remove(os.path.join(mounttmp, "testdir", "file1.txt"))
+
+    Test().runTest()
+
+def fuseRmTestHelper3(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            # Should be able to delete now that it is empty
+            os.rmdir(os.path.join(mounttmp, "testdir"))
+
+            # Make sure it's empty
+            d1 = llfuse.listdir(os.path.join(mounttmp))
+            self.assertEqual([], d1)
+
+            # Try to delete it again
+            with self.assertRaises(OSError):
+                os.rmdir(os.path.join(mounttmp, "testdir"))
+
+    Test().runTest()
+
+class FuseRmTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.pool.apply(fuseRmTestHelper1, (self.mounttmp,))
+
+        # Starting manifest
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+        self.pool.apply(fuseRmTestHelper2, (self.mounttmp,))
+
+        # Can't have empty directories :-( so manifest will be empty.
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertEqual(collection2["manifest_text"], "")
+
+        self.pool.apply(fuseRmTestHelper3, (self.mounttmp,))
+
+        # manifest should be empty now.
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertEqual(collection2["manifest_text"], "")
+
+
+def fuseMvFileTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.mkdir(os.path.join(mounttmp, "testdir"))
+
+            with open(os.path.join(mounttmp, "testdir", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+    Test().runTest()
+
+def fuseMvFileTestHelper2(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            d1 = llfuse.listdir(os.path.join(mounttmp))
+            self.assertEqual(["testdir"], d1)
+            d1 = llfuse.listdir(os.path.join(mounttmp, "testdir"))
+            self.assertEqual(["file1.txt"], d1)
+
+            os.rename(os.path.join(mounttmp, "testdir", "file1.txt"), os.path.join(mounttmp, "file1.txt"))
+
+            d1 = llfuse.listdir(os.path.join(mounttmp))
+            self.assertEqual(["file1.txt", "testdir"], sorted(d1))
+            d1 = llfuse.listdir(os.path.join(mounttmp, "testdir"))
+            self.assertEqual([], d1)
+
+    Test().runTest()
+
+class FuseMvFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.pool.apply(fuseMvFileTestHelper1, (self.mounttmp,))
+
+        # Starting manifest
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+        self.pool.apply(fuseMvFileTestHelper2, (self.mounttmp,))
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+
+def fuseRenameTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.mkdir(os.path.join(mounttmp, "testdir"))
+
+            with open(os.path.join(mounttmp, "testdir", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+    Test().runTest()
+
+class FuseRenameTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+        self.assertTrue(m.writable())
+
+        self.pool.apply(fuseRenameTestHelper, (self.mounttmp,))
+
+        # Starting manifest
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual(["testdir"], d1)
+        d1 = llfuse.listdir(os.path.join(self.mounttmp, "testdir"))
+        self.assertEqual(["file1.txt"], d1)
+
+        os.rename(os.path.join(self.mounttmp, "testdir"), os.path.join(self.mounttmp, "testdir2"))
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual(["testdir2"], sorted(d1))
+        d1 = llfuse.listdir(os.path.join(self.mounttmp, "testdir2"))
+        self.assertEqual(["file1.txt"], d1)
+
+        collection2 = self.api.collections().get(uuid=collection.manifest_locator()).execute()
+        self.assertRegexpMatches(collection2["manifest_text"],
+            r'\./testdir2 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$')
+
+
+class FuseUpdateFromEventTest(MountTestBase):
+    def runTest(self):
+        arvados.logger.setLevel(logging.DEBUG)
+
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        self.operations.listen_for_events(self.api)
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual([], sorted(d1))
+
+        with arvados.collection.Collection(collection.manifest_locator(), api_client=self.api) as collection2:
+            with collection2.open("file1.txt", "w") as f:
+                f.write("foo")
+
+        time.sleep(1)
+
+        # should show up via event bus notify
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual(["file1.txt"], sorted(d1))
+
+
+def fuseFileConflictTestHelper(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "w") as f:
+                f.write("bar")
+
+            d1 = sorted(llfuse.listdir(os.path.join(mounttmp)))
+            self.assertEqual(len(d1), 2)
+
+            with open(os.path.join(mounttmp, "file1.txt"), "r") as f:
+                self.assertEqual(f.read(), "bar")
+
+            self.assertRegexpMatches(d1[1],
+                r'file1\.txt~conflict-\d\d\d\d-\d\d-\d\d-\d\d:\d\d:\d\d~')
+
+            with open(os.path.join(mounttmp, d1[1]), "r") as f:
+                self.assertEqual(f.read(), "foo")
+
+    Test().runTest()
+
+class FuseFileConflictTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        d1 = llfuse.listdir(os.path.join(self.mounttmp))
+        self.assertEqual([], sorted(d1))
+
+        with arvados.collection.Collection(collection.manifest_locator(), api_client=self.api) as collection2:
+            with collection2.open("file1.txt", "w") as f:
+                f.write("foo")
+
+        # See comment in FuseWriteFileTest
+        self.pool.apply(fuseFileConflictTestHelper, (self.mounttmp,))
+
+
+def fuseUnlinkOpenFileTest(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "file1.txt"), "w+") as f:
+                f.write("foo")
+
+                d1 = llfuse.listdir(os.path.join(mounttmp))
+                self.assertEqual(["file1.txt"], sorted(d1))
+
+                os.remove(os.path.join(mounttmp, "file1.txt"))
+
+                d1 = llfuse.listdir(os.path.join(mounttmp))
+                self.assertEqual([], sorted(d1))
+
+                f.seek(0)
+                self.assertEqual(f.read(), "foo")
+                f.write("bar")
+
+                f.seek(0)
+                self.assertEqual(f.read(), "foobar")
+
+    Test().runTest()
+
+class FuseUnlinkOpenFileTest(MountTestBase):
+    def runTest(self):
+        collection = arvados.collection.Collection(api_client=self.api)
+        collection.save_new()
+
+        m = self.make_mount(fuse.CollectionDirectory)
+        with llfuse.lock:
+            m.new_collection(collection.api_response(), collection)
+
+        # See comment in FuseWriteFileTest
+        self.pool.apply(fuseUnlinkOpenFileTest, (self.mounttmp,))
+
+        self.assertEqual(collection.manifest_text(), "")
+
+
+def fuseMvFileBetweenCollectionsTest1(mounttmp, uuid1, uuid2):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, uuid1, "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+            d1 = os.listdir(os.path.join(mounttmp, uuid1))
+            self.assertEqual(["file1.txt"], sorted(d1))
+            d1 = os.listdir(os.path.join(mounttmp, uuid2))
+            self.assertEqual([], sorted(d1))
+
+    Test().runTest()
+
+def fuseMvFileBetweenCollectionsTest2(mounttmp, uuid1, uuid2):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.rename(os.path.join(mounttmp, uuid1, "file1.txt"), os.path.join(mounttmp, uuid2, "file2.txt"))
+
+            d1 = os.listdir(os.path.join(mounttmp, uuid1))
+            self.assertEqual([], sorted(d1))
+            d1 = os.listdir(os.path.join(mounttmp, uuid2))
+            self.assertEqual(["file2.txt"], sorted(d1))
+
+    Test().runTest()
+
+class FuseMvFileBetweenCollectionsTest(MountTestBase):
+    def runTest(self):
+        collection1 = arvados.collection.Collection(api_client=self.api)
+        collection1.save_new()
+
+        collection2 = arvados.collection.Collection(api_client=self.api)
+        collection2.save_new()
+
+        m = self.make_mount(fuse.MagicDirectory)
+
+        # See comment in FuseWriteFileTest
+        self.pool.apply(fuseMvFileBetweenCollectionsTest1, (self.mounttmp,
+                                                  collection1.manifest_locator(),
+                                                  collection2.manifest_locator()))
+
+        collection1.update()
+        collection2.update()
+
+        self.assertRegexpMatches(collection1.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$")
+        self.assertEqual(collection2.manifest_text(), "")
+
+        self.pool.apply(fuseMvFileBetweenCollectionsTest2, (self.mounttmp,
+                                                  collection1.manifest_locator(),
+                                                  collection2.manifest_locator()))
+
+        collection1.update()
+        collection2.update()
+
+        self.assertEqual(collection1.manifest_text(), "")
+        self.assertRegexpMatches(collection2.manifest_text(), r"\. 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file2\.txt$")
+
+
+def fuseMvDirBetweenCollectionsTest1(mounttmp, uuid1, uuid2):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.mkdir(os.path.join(mounttmp, uuid1, "testdir"))
+            with open(os.path.join(mounttmp, uuid1, "testdir", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+
+            d1 = os.listdir(os.path.join(mounttmp, uuid1))
+            self.assertEqual(["testdir"], sorted(d1))
+            d1 = os.listdir(os.path.join(mounttmp, uuid1, "testdir"))
+            self.assertEqual(["file1.txt"], sorted(d1))
+
+            d1 = os.listdir(os.path.join(mounttmp, uuid2))
+            self.assertEqual([], sorted(d1))
+
+    Test().runTest()
+
+
+def fuseMvDirBetweenCollectionsTest2(mounttmp, uuid1, uuid2):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.rename(os.path.join(mounttmp, uuid1, "testdir"), os.path.join(mounttmp, uuid2, "testdir2"))
+
+            d1 = os.listdir(os.path.join(mounttmp, uuid1))
+            self.assertEqual([], sorted(d1))
+
+            d1 = os.listdir(os.path.join(mounttmp, uuid2))
+            self.assertEqual(["testdir2"], sorted(d1))
+            d1 = os.listdir(os.path.join(mounttmp, uuid2, "testdir2"))
+            self.assertEqual(["file1.txt"], sorted(d1))
+
+            with open(os.path.join(mounttmp, uuid2, "testdir2", "file1.txt"), "r") as f:
+                self.assertEqual(f.read(), "Hello world!")
+
+    Test().runTest()
+
+class FuseMvDirBetweenCollectionsTest(MountTestBase):
+    def runTest(self):
+        collection1 = arvados.collection.Collection(api_client=self.api)
+        collection1.save_new()
+
+        collection2 = arvados.collection.Collection(api_client=self.api)
+        collection2.save_new()
+
+        m = self.make_mount(fuse.MagicDirectory)
+
+        # See comment in FuseWriteFileTest
+        self.pool.apply(fuseMvDirBetweenCollectionsTest1, (self.mounttmp,
+                                                  collection1.manifest_locator(),
+                                                  collection2.manifest_locator()))
+
+        collection1.update()
+        collection2.update()
+
+        self.assertRegexpMatches(collection1.manifest_text(), r"\./testdir 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$")
+        self.assertEqual(collection2.manifest_text(), "")
+
+        self.pool.apply(fuseMvDirBetweenCollectionsTest2, (self.mounttmp,
+                                                  collection1.manifest_locator(),
+                                                  collection2.manifest_locator()))
+
+        collection1.update()
+        collection2.update()
+
+        self.assertEqual(collection1.manifest_text(), "")
+        self.assertRegexpMatches(collection2.manifest_text(), r"\./testdir2 86fb269d190d2c85f6e0468ceca42a20\+12\+A[a-f0-9]{40}@[a-f0-9]{8} 0:12:file1\.txt$")
+
+
+def fuseProjectMkdirTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            os.mkdir(os.path.join(mounttmp, "testcollection"))
+            with self.assertRaises(OSError):
+                os.mkdir(os.path.join(mounttmp, "testcollection"))
+    Test().runTest()
+
+def fuseProjectMkdirTestHelper2(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            with open(os.path.join(mounttmp, "testcollection", "file1.txt"), "w") as f:
+                f.write("Hello world!")
+            with self.assertRaises(OSError):
+                os.rmdir(os.path.join(mounttmp, "testcollection"))
+            os.remove(os.path.join(mounttmp, "testcollection", "file1.txt"))
+            with self.assertRaises(OSError):
+                os.remove(os.path.join(mounttmp, "testcollection"))
+            os.rmdir(os.path.join(mounttmp, "testcollection"))
+    Test().runTest()
+
+class FuseProjectMkdirRmdirTest(MountTestBase):
+    def runTest(self):
+        self.make_mount(fuse.ProjectDirectory,
+                        project_object=self.api.users().current().execute())
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertNotIn('testcollection', d1)
+
+        self.pool.apply(fuseProjectMkdirTestHelper1, (self.mounttmp,))
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertIn('testcollection', d1)
+
+        self.pool.apply(fuseProjectMkdirTestHelper2, (self.mounttmp,))
+
+        d1 = llfuse.listdir(self.mounttmp)
+        self.assertNotIn('testcollection', d1)
+
+
+def fuseProjectMvTestHelper1(mounttmp):
+    class Test(unittest.TestCase):
+        def runTest(self):
+            d1 = llfuse.listdir(mounttmp)
+            self.assertNotIn('testcollection', d1)
+
+            os.mkdir(os.path.join(mounttmp, "testcollection"))
+
+            d1 = llfuse.listdir(mounttmp)
+            self.assertIn('testcollection', d1)
+
+            with self.assertRaises(OSError):
+                os.rename(os.path.join(mounttmp, "testcollection"), os.path.join(mounttmp, 'Unrestricted public data'))
+
+            os.rename(os.path.join(mounttmp, "testcollection"), os.path.join(mounttmp, 'Unrestricted public data', 'testcollection'))
+
+            d1 = llfuse.listdir(mounttmp)
+            self.assertNotIn('testcollection', d1)
+
+            d1 = llfuse.listdir(os.path.join(mounttmp, 'Unrestricted public data'))
+            self.assertIn('testcollection', d1)
+
+    Test().runTest()
+
+class FuseProjectMvTest(MountTestBase):
+    def runTest(self):
+        self.make_mount(fuse.ProjectDirectory,
+                        project_object=self.api.users().current().execute())
+
+        self.pool.apply(fuseProjectMvTestHelper1, (self.mounttmp,))
+
+
 class FuseUnitTest(unittest.TestCase):
     def test_sanitize_filename(self):
         acceptable = [