3198: concurrency test and associated fixes
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 18 Jun 2015 20:32:21 +0000 (16:32 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 18 Jun 2015 20:32:21 +0000 (16:32 -0400)
sdk/python/arvados/collection.py
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/arvados_fuse/fusefile.py
services/fuse/tests/fstest.py [new file with mode: 0644]
services/fuse/tests/prof.py [new file with mode: 0644]

index fda1b63de86a9c73e154281245ec2b434cd13da0..38e794c24a217ffa5c76c1a7b026e4d432d369a0 100644 (file)
@@ -1215,7 +1215,6 @@ class Collection(RichCollectionBase):
                 # We've merged this record this before.  Don't do anything.
                 return
             else:
-                _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
@@ -1246,7 +1245,6 @@ class Collection(RichCollectionBase):
 
     def _remember_api_response(self, response):
         self._api_response = response
-        _logger.debug("Remembering %s %s", response.get("modified_at"), response.get("portable_data_hash"))
         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
 
     def _populate_from_api_server(self):
index eba17b37c35c420bbc476850bc813a72a9c2cc9c..cab44dd5798bad4b81a3f1b99c6f35cb5ac69c48 100644 (file)
@@ -227,41 +227,22 @@ class Inodes(object):
 
     def del_entry(self, entry):
         if entry.ref_count == 0:
-            _logger.debug("Deleting inode %i", entry.inode)
             self.inode_cache.unmanage(entry)
-            _logger.debug("(1) unmanaged inode %i", entry.inode)
-
             del self._entries[entry.inode]
-            _logger.debug("(2) deleted inode %i", entry.inode)
-
             with llfuse.lock_released:
                 entry.finalize()
-                _logger.debug("(3) finalized inode %i", entry.inode)
-
             self.invalidate_inode(entry.inode)
-            _logger.debug("(4) invalidated inode %i", entry.inode)
-
             entry.inode = None
         else:
             entry.dead = True
             _logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
 
     def invalidate_inode(self, inode):
-        self.deferred_invalidations.append((inode,))
+        llfuse.invalidate_inode(inode)
 
     def invalidate_entry(self, inode, name):
-        self.deferred_invalidations.append((inode, name))
+        llfuse.invalidate_entry(inode, name)
 
-    def do_invalidations(self):
-        di = self.deferred_invalidations
-        self.deferred_invalidations = []
-
-        with llfuse.lock_released:
-            for d in di:
-                if len(d) == 1:
-                    llfuse.invalidate_inode(d[0])
-                elif len(d) == 2:
-                    llfuse.invalidate_entry(d[0], d[1])
 
 def catch_exceptions(orig_func):
     """Catch uncaught exceptions and log them consistently."""
@@ -274,19 +255,18 @@ def catch_exceptions(orig_func):
             raise
         except EnvironmentError as e:
             raise llfuse.FUSEError(e.errno)
+        except arvados.errors.KeepWriteError as e:
+            _logger.error("Keep write error: " + str(e))
+            raise llfuse.FUSEError(errno.EIO)
+        except arvados.errors.NotFoundError as e:
+            _logger.error("Block not found error: " + str(e))
+            raise llfuse.FUSEError(errno.EIO)
         except:
             _logger.exception("Unhandled exception during FUSE operation")
             raise llfuse.FUSEError(errno.EIO)
 
     return catch_exceptions_wrapper
 
-def deferred_invalidate(orig_func):
-    @functools.wraps(orig_func)
-    def deferred_invalidate_wrapper(self, *args, **kwargs):
-        n = orig_func(self, *args, **kwargs)
-        self.inodes.do_invalidations()
-        return n
-    return deferred_invalidate_wrapper
 
 class Operations(llfuse.Operations):
     """This is the main interface with llfuse.
@@ -334,7 +314,10 @@ class Operations(llfuse.Operations):
             self.events = None
 
         for k,v in self.inodes.items():
-            v.finalize()
+            try:
+                v.finalize()
+            except Exception as e:
+                _logger.exception("Error during finalize of inode %i", k)
         self.inodes = None
 
     def access(self, inode, mode, ctx):
@@ -375,7 +358,6 @@ class Operations(llfuse.Operations):
                     itemparent.invalidate()
                     itemparent.update()
 
-                self.inodes.do_invalidations()
 
     @catch_exceptions
     def getattr(self, inode):
@@ -408,8 +390,6 @@ class Operations(llfuse.Operations):
 
         entry.st_size = e.size()
 
-        _logger.debug("getattr got size")
-
         entry.st_blksize = 512
         entry.st_blocks = (entry.st_size/512)+1
         entry.st_atime = int(e.atime())
@@ -457,7 +437,6 @@ class Operations(llfuse.Operations):
             raise llfuse.FUSEError(errno.ENOENT)
 
     @catch_exceptions
-    @deferred_invalidate
     def forget(self, inodes):
         for inode, nlookup in inodes:
             ent = self.inodes[inode]
@@ -493,11 +472,7 @@ class Operations(llfuse.Operations):
 
         self.inodes.touch(handle.obj)
 
-        try:
-            return handle.obj.readfrom(off, size, self.num_retries)
-        except arvados.errors.NotFoundError as e:
-            _logger.error("Block not found: " + str(e))
-            raise llfuse.FUSEError(errno.EIO)
+        return handle.obj.readfrom(off, size, self.num_retries)
 
     @catch_exceptions
     def write(self, fh, off, buf):
@@ -515,24 +490,21 @@ class Operations(llfuse.Operations):
         return handle.obj.writeto(off, buf, self.num_retries)
 
     @catch_exceptions
-    @deferred_invalidate
     def release(self, fh):
         if fh in self._filehandles:
             try:
                 self._filehandles[fh].flush()
-            except EnvironmentError as e:
-                raise llfuse.FUSEError(e.errno)
             except Exception:
-                _logger.exception("Flush error")
-            self._filehandles[fh].release()
-            del self._filehandles[fh]
+                raise
+            finally:
+                self._filehandles[fh].release()
+                del self._filehandles[fh]
         self.inodes.inode_cache.cap_cache()
 
     def releasedir(self, fh):
         self.release(fh)
 
     @catch_exceptions
-    @deferred_invalidate
     def opendir(self, inode):
         _logger.debug("arv-mount opendir: inode %i", inode)
 
@@ -607,7 +579,6 @@ class Operations(llfuse.Operations):
         return p
 
     @catch_exceptions
-    @deferred_invalidate
     def create(self, inode_parent, name, mode, flags, ctx):
         _logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
 
@@ -624,7 +595,6 @@ class Operations(llfuse.Operations):
         return (fh, self.getattr(f.inode))
 
     @catch_exceptions
-    @deferred_invalidate
     def mkdir(self, inode_parent, name, mode, ctx):
         _logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
 
@@ -638,21 +608,18 @@ class Operations(llfuse.Operations):
         return self.getattr(d.inode)
 
     @catch_exceptions
-    @deferred_invalidate
     def unlink(self, inode_parent, name):
         _logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
 
     @catch_exceptions
-    @deferred_invalidate
     def rmdir(self, inode_parent, name):
         _logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
 
     @catch_exceptions
-    @deferred_invalidate
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
         _logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
@@ -660,7 +627,6 @@ class Operations(llfuse.Operations):
         dest.rename(name_old, name_new, src)
 
     @catch_exceptions
-    @deferred_invalidate
     def flush(self, fh):
         if fh in self._filehandles:
             self._filehandles[fh].flush()
index 50ea0eaa3f07268ffd770996379e41203fc43c8e..16b3bb2cdb53c80a40166bea4b6ab4e816435a90 100644 (file)
@@ -238,30 +238,18 @@ class CollectionDirectoryBase(Directory):
             name = sanitize_filename(name)
             _logger.debug("collection notify %s %s %s %s", event, collection, name, item)
             with llfuse.lock:
-                _logger.debug("on_event got llfuse.lock %s %s %s", event, collection, name)
                 if event == arvados.collection.ADD:
                     self.new_entry(name, item, self.mtime())
                 elif event == arvados.collection.DEL:
-                    _logger.debug("on_event (1) %s %s %s", event, collection, name)
                     ent = self._entries[name]
-                    _logger.debug("on_event (2) %s %s %s", event, collection, name)
                     del self._entries[name]
-
-                    _logger.debug("on_event (3) %s %s %s", event, collection, name)
-
                     self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
-
-                    _logger.debug("on_event (4) %s %s %s", event, collection, name)
-
                     self.inodes.del_entry(ent)
-
-                    _logger.debug("on_event (5) %s %s %s", event, collection, name)
                 elif event == arvados.collection.MOD:
                     if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
                         self.inodes.invalidate_inode(item.fuse_entry.inode)
                     elif name in self._entries:
                         self.inodes.invalidate_inode(self._entries[name].inode)
-                _logger.debug("on_event completed %s %s %s", event, collection, name)
 
     def populate(self, mtime):
         self._mtime = mtime
index e61ba54bf681b6ef1f42ec882a72e0ac1999be60..4d472cff1cca38380d80afa63b9783027ad1db30 100644 (file)
@@ -46,9 +46,7 @@ class FuseArvadosFile(File):
         self.arvfile = arvfile
 
     def size(self):
-        _logger.debug("started calling self.arvfile.size()")
         with llfuse.lock_released:
-            _logger.debug("locked_released and calling self.arvfile.size()")
             return self.arvfile.size()
 
     def readfrom(self, off, size, num_retries=0):
diff --git a/services/fuse/tests/fstest.py b/services/fuse/tests/fstest.py
new file mode 100644 (file)
index 0000000..cf081b7
--- /dev/null
@@ -0,0 +1,133 @@
+from multiprocessing import Process
+import os
+import subprocess
+import sys
+import prof
+
+def fn(n):
+    return "file%i" % n
+
+def createfiles(d, n):
+    for j in xrange(1, 5):
+        print "Starting small file %s %i, %i" % (d, n, j)
+        if d:
+            os.mkdir(d)
+            ld = os.listdir('.')
+            if d not in ld:
+                print "ERROR %s missing" % d
+            os.chdir(d)
+
+        for i in xrange(n, n+10):
+            with open(fn(i), "w") as f:
+                f.write(fn(i))
+
+        ld = os.listdir('.')
+        for i in xrange(n, n+10):
+            if fn(i) not in ld:
+                print "ERROR %s missing" % fn(i)
+
+        for i in xrange(n, n+10):
+            with open(fn(i), "r") as f:
+                if f.read() != fn(i):
+                    print "ERROR %s doesn't have expected contents" % fn(i)
+
+        for i in xrange(n, n+10):
+            os.remove(fn(i))
+
+        ld = os.listdir('.')
+        for i in xrange(n, n+10):
+            if fn(i) in ld:
+                print "ERROR %s should have been removed" % fn(i)
+
+        if d:
+            os.chdir('..')
+            os.rmdir(d)
+            ld = os.listdir('.')
+            if d in ld:
+                print "ERROR %s should have been removed" % d
+
+
+def createbigfile(d, n):
+    for j in xrange(1, 5):
+        print "Starting big file %s %i, %i" % (d, n, j)
+        i = n
+        if d:
+            os.mkdir(d)
+            ld = os.listdir('.')
+            if d not in ld:
+                print "ERROR %s missing" % d
+            os.chdir(d)
+
+        with open(fn(i), "w") as f:
+            for j in xrange(0, 1000):
+                f.write((str(j) + fn(i)) * 10000)
+
+        ld = os.listdir('.')
+        if fn(i) not in ld:
+            print "ERROR %s missing" % fn(i)
+
+        with open(fn(i), "r") as f:
+            for j in xrange(0, 1000):
+                expect = (str(j) + fn(i)) * 10000
+                if f.read(len(expect)) != expect:
+                    print "ERROR %s doesn't have expected contents" % fn(i)
+
+        os.remove(fn(i))
+
+        ld = os.listdir('.')
+        if fn(i) in ld:
+            print "ERROR %s should have been removed" % fn(i)
+
+        if d:
+            os.chdir('..')
+            os.rmdir(d)
+            ld = os.listdir('.')
+            if d in ld:
+                print "ERROR %s should have been removed" % d
+
+def do_ls():
+    with open("/dev/null", "w") as nul:
+        for j in xrange(1, 50):
+            subprocess.call(["ls", "-l"], stdout=nul, stderr=nul)
+
+def runit(target, indir):
+    procs = []
+    for n in xrange(0, 20):
+        if indir:
+            p = Process(target=target, args=("dir%i" % n, n*10,))
+        else:
+            p = Process(target=target, args=("", n*10,))
+        p.start()
+        procs.append(p)
+
+    p = Process(target=do_ls, args=())
+    p.start()
+    procs.append(p)
+
+    for p in procs:
+        p.join()
+
+    if os.listdir('.'):
+        print "ERROR there are left over files in the directory"
+
+
+if __name__ == '__main__':
+    if os.listdir('.'):
+        print "ERROR starting directory is not empty"
+        sys.exit()
+
+    print "Single directory small files"
+    with prof.CountTime():
+        runit(createfiles, False)
+
+    print "Separate directories small files"
+    with prof.CountTime():
+        runit(createfiles, True)
+
+    print "Single directory large files"
+    with prof.CountTime():
+        runit(createbigfile, False)
+
+    print "Separate directories large files"
+    with prof.CountTime():
+        runit(createbigfile, True)
diff --git a/services/fuse/tests/prof.py b/services/fuse/tests/prof.py
new file mode 100644 (file)
index 0000000..49b9f24
--- /dev/null
@@ -0,0 +1,17 @@
+import time
+
+class CountTime(object):
+    def __init__(self, tag="", size=None):
+        self.tag = tag
+        self.size = size
+
+    def __enter__(self):
+        self.start = time.time()
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        sec = (time.time() - self.start)
+        th = ""
+        if self.size:
+            th = "throughput %s/sec" % (self.size / sec)
+        print "%s time %s micoseconds %s" % (self.tag, sec*1000000, th)