def del_entry(self, entry):
if entry.ref_count == 0:
- _logger.debug("Deleting inode %i", entry.inode)
self.inode_cache.unmanage(entry)
- _logger.debug("(1) unmanaged inode %i", entry.inode)
-
del self._entries[entry.inode]
- _logger.debug("(2) deleted inode %i", entry.inode)
-
with llfuse.lock_released:
entry.finalize()
- _logger.debug("(3) finalized inode %i", entry.inode)
-
self.invalidate_inode(entry.inode)
- _logger.debug("(4) invalidated inode %i", entry.inode)
-
entry.inode = None
else:
entry.dead = True
_logger.debug("del_entry on inode %i with refcount %i", entry.inode, entry.ref_count)
def invalidate_inode(self, inode):
- self.deferred_invalidations.append((inode,))
+ llfuse.invalidate_inode(inode)
def invalidate_entry(self, inode, name):
- self.deferred_invalidations.append((inode, name))
+ llfuse.invalidate_entry(inode, name)
- def do_invalidations(self):
- di = self.deferred_invalidations
- self.deferred_invalidations = []
-
- with llfuse.lock_released:
- for d in di:
- if len(d) == 1:
- llfuse.invalidate_inode(d[0])
- elif len(d) == 2:
- llfuse.invalidate_entry(d[0], d[1])
def catch_exceptions(orig_func):
"""Catch uncaught exceptions and log them consistently."""
raise
except EnvironmentError as e:
raise llfuse.FUSEError(e.errno)
+ except arvados.errors.KeepWriteError as e:
+ _logger.error("Keep write error: " + str(e))
+ raise llfuse.FUSEError(errno.EIO)
+ except arvados.errors.NotFoundError as e:
+ _logger.error("Block not found error: " + str(e))
+ raise llfuse.FUSEError(errno.EIO)
except:
_logger.exception("Unhandled exception during FUSE operation")
raise llfuse.FUSEError(errno.EIO)
return catch_exceptions_wrapper
-def deferred_invalidate(orig_func):
- @functools.wraps(orig_func)
- def deferred_invalidate_wrapper(self, *args, **kwargs):
- n = orig_func(self, *args, **kwargs)
- self.inodes.do_invalidations()
- return n
- return deferred_invalidate_wrapper
class Operations(llfuse.Operations):
"""This is the main interface with llfuse.
self.events = None
for k,v in self.inodes.items():
- v.finalize()
+ try:
+ v.finalize()
+ except Exception as e:
+ _logger.exception("Error during finalize of inode %i", k)
self.inodes = None
def access(self, inode, mode, ctx):
itemparent.invalidate()
itemparent.update()
- self.inodes.do_invalidations()
@catch_exceptions
def getattr(self, inode):
entry.st_size = e.size()
- _logger.debug("getattr got size")
-
entry.st_blksize = 512
entry.st_blocks = (entry.st_size/512)+1
entry.st_atime = int(e.atime())
raise llfuse.FUSEError(errno.ENOENT)
@catch_exceptions
- @deferred_invalidate
def forget(self, inodes):
for inode, nlookup in inodes:
ent = self.inodes[inode]
self.inodes.touch(handle.obj)
- try:
- return handle.obj.readfrom(off, size, self.num_retries)
- except arvados.errors.NotFoundError as e:
- _logger.error("Block not found: " + str(e))
- raise llfuse.FUSEError(errno.EIO)
+ return handle.obj.readfrom(off, size, self.num_retries)
@catch_exceptions
def write(self, fh, off, buf):
return handle.obj.writeto(off, buf, self.num_retries)
@catch_exceptions
- @deferred_invalidate
def release(self, fh):
if fh in self._filehandles:
try:
self._filehandles[fh].flush()
- except EnvironmentError as e:
- raise llfuse.FUSEError(e.errno)
except Exception:
- _logger.exception("Flush error")
- self._filehandles[fh].release()
- del self._filehandles[fh]
+ raise
+ finally:
+ self._filehandles[fh].release()
+ del self._filehandles[fh]
self.inodes.inode_cache.cap_cache()
def releasedir(self, fh):
self.release(fh)
@catch_exceptions
- @deferred_invalidate
def opendir(self, inode):
_logger.debug("arv-mount opendir: inode %i", inode)
return p
@catch_exceptions
- @deferred_invalidate
def create(self, inode_parent, name, mode, flags, ctx):
_logger.debug("arv-mount create: %i '%s' %o", inode_parent, name, mode)
return (fh, self.getattr(f.inode))
@catch_exceptions
- @deferred_invalidate
def mkdir(self, inode_parent, name, mode, ctx):
_logger.debug("arv-mount mkdir: %i '%s' %o", inode_parent, name, mode)
return self.getattr(d.inode)
@catch_exceptions
- @deferred_invalidate
def unlink(self, inode_parent, name):
_logger.debug("arv-mount unlink: %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.unlink(name)
@catch_exceptions
- @deferred_invalidate
def rmdir(self, inode_parent, name):
_logger.debug("arv-mount rmdir: %i '%s'", inode_parent, name)
p = self._check_writable(inode_parent)
p.rmdir(name)
@catch_exceptions
- @deferred_invalidate
def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
_logger.debug("arv-mount rename: %i '%s' %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
src = self._check_writable(inode_parent_old)
dest.rename(name_old, name_new, src)
@catch_exceptions
- @deferred_invalidate
def flush(self, fh):
if fh in self._filehandles:
self._filehandles[fh].flush()
name = sanitize_filename(name)
_logger.debug("collection notify %s %s %s %s", event, collection, name, item)
with llfuse.lock:
- _logger.debug("on_event got llfuse.lock %s %s %s", event, collection, name)
if event == arvados.collection.ADD:
self.new_entry(name, item, self.mtime())
elif event == arvados.collection.DEL:
- _logger.debug("on_event (1) %s %s %s", event, collection, name)
ent = self._entries[name]
- _logger.debug("on_event (2) %s %s %s", event, collection, name)
del self._entries[name]
-
- _logger.debug("on_event (3) %s %s %s", event, collection, name)
-
self.inodes.invalidate_entry(self.inode, name.encode(self.inodes.encoding))
-
- _logger.debug("on_event (4) %s %s %s", event, collection, name)
-
self.inodes.del_entry(ent)
-
- _logger.debug("on_event (5) %s %s %s", event, collection, name)
elif event == arvados.collection.MOD:
if hasattr(item, "fuse_entry") and item.fuse_entry is not None:
self.inodes.invalidate_inode(item.fuse_entry.inode)
elif name in self._entries:
self.inodes.invalidate_inode(self._entries[name].inode)
- _logger.debug("on_event completed %s %s %s", event, collection, name)
def populate(self, mtime):
self._mtime = mtime
--- /dev/null
+from multiprocessing import Process
+import os
+import subprocess
+import sys
+import prof
+
+def fn(n):
+ return "file%i" % n
+
+def createfiles(d, n):
+ for j in xrange(1, 5):
+ print "Starting small file %s %i, %i" % (d, n, j)
+ if d:
+ os.mkdir(d)
+ ld = os.listdir('.')
+ if d not in ld:
+ print "ERROR %s missing" % d
+ os.chdir(d)
+
+ for i in xrange(n, n+10):
+ with open(fn(i), "w") as f:
+ f.write(fn(i))
+
+ ld = os.listdir('.')
+ for i in xrange(n, n+10):
+ if fn(i) not in ld:
+ print "ERROR %s missing" % fn(i)
+
+ for i in xrange(n, n+10):
+ with open(fn(i), "r") as f:
+ if f.read() != fn(i):
+ print "ERROR %s doesn't have expected contents" % fn(i)
+
+ for i in xrange(n, n+10):
+ os.remove(fn(i))
+
+ ld = os.listdir('.')
+ for i in xrange(n, n+10):
+ if fn(i) in ld:
+ print "ERROR %s should have been removed" % fn(i)
+
+ if d:
+ os.chdir('..')
+ os.rmdir(d)
+ ld = os.listdir('.')
+ if d in ld:
+ print "ERROR %s should have been removed" % d
+
+
+def createbigfile(d, n):
+ for j in xrange(1, 5):
+ print "Starting big file %s %i, %i" % (d, n, j)
+ i = n
+ if d:
+ os.mkdir(d)
+ ld = os.listdir('.')
+ if d not in ld:
+ print "ERROR %s missing" % d
+ os.chdir(d)
+
+ with open(fn(i), "w") as f:
+ for j in xrange(0, 1000):
+ f.write((str(j) + fn(i)) * 10000)
+
+ ld = os.listdir('.')
+ if fn(i) not in ld:
+ print "ERROR %s missing" % fn(i)
+
+ with open(fn(i), "r") as f:
+ for j in xrange(0, 1000):
+ expect = (str(j) + fn(i)) * 10000
+ if f.read(len(expect)) != expect:
+ print "ERROR %s doesn't have expected contents" % fn(i)
+
+ os.remove(fn(i))
+
+ ld = os.listdir('.')
+ if fn(i) in ld:
+ print "ERROR %s should have been removed" % fn(i)
+
+ if d:
+ os.chdir('..')
+ os.rmdir(d)
+ ld = os.listdir('.')
+ if d in ld:
+ print "ERROR %s should have been removed" % d
+
+def do_ls():
+ with open("/dev/null", "w") as nul:
+ for j in xrange(1, 50):
+ subprocess.call(["ls", "-l"], stdout=nul, stderr=nul)
+
+def runit(target, indir):
+ procs = []
+ for n in xrange(0, 20):
+ if indir:
+ p = Process(target=target, args=("dir%i" % n, n*10,))
+ else:
+ p = Process(target=target, args=("", n*10,))
+ p.start()
+ procs.append(p)
+
+ p = Process(target=do_ls, args=())
+ p.start()
+ procs.append(p)
+
+ for p in procs:
+ p.join()
+
+ if os.listdir('.'):
+ print "ERROR there are left over files in the directory"
+
+
+if __name__ == '__main__':
+ if os.listdir('.'):
+ print "ERROR starting directory is not empty"
+ sys.exit()
+
+ print "Single directory small files"
+ with prof.CountTime():
+ runit(createfiles, False)
+
+ print "Separate directories small files"
+ with prof.CountTime():
+ runit(createfiles, True)
+
+ print "Single directory large files"
+ with prof.CountTime():
+ runit(createbigfile, False)
+
+ print "Separate directories large files"
+ with prof.CountTime():
+ runit(createbigfile, True)