From fb835ba018ae9f4509063e93f7054fdd23b6669d Mon Sep 17 00:00:00 2001 From: peter Date: Tue, 18 Feb 2014 22:36:01 +0000 Subject: [PATCH] Robustness improvements --- sdk/python/bin/arv-mount | 441 ++++++++++++++++++++++++++------------ sdk/python/bin/arv-mount2 | 239 --------------------- 2 files changed, 299 insertions(+), 381 deletions(-) delete mode 100755 sdk/python/bin/arv-mount2 diff --git a/sdk/python/bin/arv-mount b/sdk/python/bin/arv-mount index 667f36e815..d0fb5cae9b 100755 --- a/sdk/python/bin/arv-mount +++ b/sdk/python/bin/arv-mount @@ -1,158 +1,315 @@ #!/usr/bin/env python -import argparse -import hashlib import os -import re -import string import sys -import logging -import fuse + +import llfuse import errno import stat +import threading import arvados -import time - -class KeepMount(fuse.LoggingMixIn, fuse.Operations): - 'Read-only Keep mount.' - - def __init__(self): - self.arv = arvados.api('v1') - self.reader = None - self.collections = {} - self.audited = dict(read={}) - - def load_collection(self, uuid): - if uuid in self.collections: - return - now = time.time() - reader = arvados.CollectionReader(uuid) - files = {} - files[''] = dict( - stat=dict( - st_mode=(stat.S_IFDIR | 0755), st_ctime=now, - st_mtime=now, st_atime=now, st_nlink=2)) +import argparse +import pprint + +from time import time +from llfuse import FUSEError + +class Directory(object): + def __init__(self, parent_inode): + self.inode = None + self.parent_inode = parent_inode + self._entries = {} + + def __getitem__(self, item): + return self._entries[item] + + def __setitem__(self, key, item): + self._entries[key] = item + + def __iter__(self): + return self._entries.iterkeys() + + def items(self): + return self._entries.items() + + def __contains__(self, k): + return k in self._entries + + def size(self): + return 0 + +class MagicDirectory(Directory): + def __init__(self, parent_inode, ops): + super(MagicDirectory, self).__init__(parent_inode) + self.ops = ops + + def __contains__(self, k): + #print 'contains',k + if k in self._entries: + return True try: - for s in reader.all_streams(): - for f in s.all_files(): - path = re.sub(r'^\./', '', os.path.join(s.name(), f.name())) - files[path] = dict( - stat=dict( - st_mode=(stat.S_IFREG | 0444), - st_size=f.size(), st_nlink=1, - st_ctime=now, st_mtime=now, st_atime=now), - arv_file=f) - logger.debug("collection.load: %s: %s" % (uuid, path)) - except: - # TODO: propagate real error, don't assume ENOENT - raise fuse.FuseOSError(errno.ENOENT) - self.collections[uuid] = dict(reader=reader, files=files) - logger.info("collection.load %s" % uuid) + if arvados.Keep.get(k): + #print 'in keep' + return True + else: + #print 'not in keep' + return False + except Exception as e: + #print 'exception keep', e + return False + + def __getitem__(self, item): + if item not in self._entries: + collection = arvados.CollectionReader(arvados.Keep.get(item)) + self._entries[item] = self.ops.add_entry(Directory(self.inode)) + self.ops.load_collection(self._entries[item], collection) + return self._entries[item] + +class File(object): + def __init__(self, parent_inode, reader): + self.inode = None + self.parent_inode = parent_inode + self.reader = reader + + def size(self): + return self.reader.size() + +class FileHandle(object): + def __init__(self, fh, entry): + self.fh = fh + self.entry = entry - def setup_reader(self, path): - logger.debug("%s", path.split('/')) +class Operations(llfuse.Operations): + + def __init__(self, collection, uid, gid): + super(Operations, self).__init__() + #self.api = arvados.api('v1') + + # dict of inodes to collection entry + self.inodes = {} + + self.uid = uid + self.gid = gid + + #print "root_parent", root_parent, "llfuse.ROOT_INODE", llfuse.ROOT_INODE + + if collection: + self.inodes_counter = llfuse.ROOT_INODE + self.root = Directory(self.inodes_counter) + self.root.inode = self.inodes_counter + self.inodes[self.root.inode] = self.root + self.load_collection(self.root, collection) + else: + self.inodes_counter = llfuse.ROOT_INODE + self.root = MagicDirectory(self.inodes_counter, self) + self.root.inode = self.inodes_counter + self.inodes[self.root.inode] = self.root + + # dict of inode to filehandle + self._filehandles = {} + self._filehandles_counter = 1 + + def load_collection(self, parent_dir, collection): + for s in collection.all_streams(): + cwd = parent_dir + for part in s.name().split('/'): + if part != '' and part != '.': + if part not in cwd: + cwd[part] = self.add_entry(Directory(cwd.inode)) + cwd = cwd[part] + for k, v in s.files().items(): + cwd[k] = self.add_entry(File(cwd.inode, v)) + + def add_entry(self, entry): + self.inodes_counter += 1 + entry.inode = self.inodes_counter + self.inodes[entry.inode] = entry + return entry + + def access(self, inode, mode, ctx): return True + + def getattr(self, inode): + e = self.inodes[inode] + + entry = llfuse.EntryAttributes() + entry.st_ino = inode + entry.generation = 0 + entry.entry_timeout = 300 + entry.attr_timeout = 300 + + entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH + if isinstance(e, Directory): + entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR + else: + entry.st_mode |= stat.S_IFREG + + entry.st_nlink = 1 + entry.st_uid = self.uid + entry.st_gid = self.gid + entry.st_rdev = 0 + + entry.st_size = e.size() + + entry.st_blksize = 1024 + entry.st_blocks = e.size()/1024 + if e.size()/1024 != 0: + entry.st_blocks += 1 + entry.st_atime = 0 + entry.st_mtime = 0 + entry.st_ctime = 0 + + return entry + + def lookup(self, parent_inode, name): + #print "lookup: parent_inode", parent_inode, "name", name + inode = None + + if name == '.': + inode = parent_inode + else: + if parent_inode in self.inodes: + p = self.inodes[parent_inode] + if name == '..': + inode = p.parent_inode + elif name in p: + inode = p[name].inode + + if inode != None: + return self.getattr(inode) + else: + raise llfuse.FUSEError(errno.ENOENT) + + def open(self, inode, flags): + if inode in self.inodes: + p = self.inodes[inode] + else: + raise llfuse.FUSEError(errno.ENOENT) + + if (flags & os.O_WRONLY) or (flags & os.O_RDWR): + raise llfuse.FUSEError(errno.EROFS) + + if isinstance(p, Directory): + raise llfuse.FUSEError(errno.EISDIR) + + fh = self._filehandles_counter + self._filehandles_counter += 1 + self._filehandles[fh] = FileHandle(fh, p) + return fh + + def read(self, fh, off, size): + #print "read", fh, off, size + if fh in self._filehandles: + handle = self._filehandles[fh] + else: + raise llfuse.FUSEError(errno.EBADF) + + try: + with llfuse.lock_released: + return handle.entry.reader.readfrom(off, size) + except: + raise llfuse.FUSEError(errno.EIO) + + def release(self, fh): + if fh in self._filehandles: + del self._filehandles[fh] + + def opendir(self, inode): + #print "opendir: inode", inode + + if inode in self.inodes: + p = self.inodes[inode] + else: + raise llfuse.FUSEError(errno.ENOENT) + + if not isinstance(p, Directory): + raise llfuse.FUSEError(errno.ENOTDIR) + + fh = self._filehandles_counter + self._filehandles_counter += 1 + if p.parent_inode in self.inodes: + parent = self.inodes[p.parent_inode] + else: + parent = None + self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items())) + return fh + + def readdir(self, fh, off): + #print "readdir: fh", fh, "off", off + + if fh in self._filehandles: + handle = self._filehandles[fh] + else: + raise llfuse.FUSEError(errno.EBADF) + + #print "handle.entry", handle.entry + + e = off + while e < len(handle.entry): + yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1) + e += 1 + + def releasedir(self, fh): + del self._filehandles[fh] + + def statfs(self): + st = llfuse.StatvfsData() + st.f_bsize = 4096 + st.f_blocks = 0 + st.f_files = 0 + + st.f_bfree = 0 + st.f_bavail = 0 + + st.f_ffree = 0 + st.f_favail = 0 + + st.f_frsize = 0 + return st + + def create(self, p1, p2, p3, p4, p5): + raise llfuse.FUSEError(errno.EROFS) + + def rename(self, inode_parent_old, name_old, inode_parent_new, name_new): + raise llfuse.FUSEError(errno.EROFS) + + def unlink(self, parent_inode, name): + raise llfuse.FUSEError(errno.EROFS) + + def rmdir(self, parent_inode, name): + raise llfuse.FUSEError(errno.EROFS) + + def symlink(self, inode_parent, name, target, ctx): + raise llfuse.FUSEError(errno.EROFS) + + +if __name__ == '__main__': - def set_args(self, args): - self.args = args - - def parse_and_load(self, path): - parts = path.split(os.path.sep, 2) - while len(parts) < 3: - parts += [''] - if not re.match(r'[0-9a-f]{32,}(\+\S+?)*', parts[1]): - raise fuse.FuseOSError(errno.ENOENT) - if self.args.collection != []: - if parts[1] not in self.args.collection: - raise fuse.FuseOSError(errno.EPERM) - self.load_collection(parts[1]) - return parts[0:3] - - def audit_read(self, uuid): - if self.args.audit and uuid not in self.audited['read']: - self.audited['read'][uuid] = True - logger.info("collection.read %s" % uuid) - - def read(self, path, size, offset, fh): - _, uuid, target = self.parse_and_load(path) - if (uuid not in self.collections or - target not in self.collections[uuid]['files']): - raise fuse.FuseOSError(errno.ENOENT) - self.audit_read(uuid) - f = self.collections[uuid]['files'][target]['arv_file'] - f.seek(offset) - return f.read(size) - - def readdir(self, path, fh): - if path == '/': - raise fuse.FuseOSError(errno.EPERM) - _, uuid, target = self.parse_and_load(path) - if uuid not in self.collections: - raise fuse.FuseOSError(errno.ENOENT) - if target != '' and target[-1] != os.path.sep: - target += os.path.sep - dirs = {} - for filepath in self.collections[uuid]['files']: - if filepath != '': - logger.debug(filepath) - if target == '' or 0 == string.find(filepath, target): - dirs[filepath[len(target):].split(os.path.sep)[0]] = True - return ['.', '..'] + dirs.keys() - - def getattr(self, path, fh=None): - if path == '/': - now = time.time() - return dict(st_mode=(stat.S_IFDIR | 0111), st_ctime=now, - st_mtime=now, st_atime=now, st_nlink=2) - _, uuid, target = self.parse_and_load(path) - if uuid not in self.collections: - raise fuse.FuseOSError(errno.ENOENT) - if target in self.collections[uuid]['files']: - return self.collections[uuid]['files'][target]['stat'] - for filepath in self.collections[uuid]['files']: - if filepath != '': - if target == '' or 0 == string.find(filepath, target + '/'): - return self.collections[uuid]['files']['']['stat'] - raise fuse.FuseOSError(errno.ENOENT) - -def parse_args(): parser = argparse.ArgumentParser( description='Mount Keep data under the local filesystem.') - parser.add_argument('mountpoint', type=str, - help=""" -Mount point. -""") - parser.add_argument('--collection', type=str, action='append', default=[], - help=""" -Collection locator. If none supplied, provide access to all readable -manifests. -""") - parser.add_argument('--audit', action='store_true', - help=""" -Print the collection uuid on stderr the first time a given collection -is read. -""") - parser.add_argument('--debug', action='store_true', - help=""" -Print debug messages. -""") - parser.add_argument('--foreground', action='store_true', - help=""" -Run in foreground, instead of detaching and running as a daemon. -""") + parser.add_argument('mountpoint', type=str, help="""Mount point.""") + parser.add_argument('--collection', type=str, help="""Collection locator""") + parser.add_argument('--debug', action='store_true', help="""Debug mode""") + args = parser.parse_args() - return args -if __name__ == '__main__': - args = parse_args() - logger = logging.getLogger(os.path.basename(sys.argv[0])) - if args.audit: - logging.basicConfig(level=logging.INFO) + if args.collection != None: + collection = arvados.CollectionReader(arvados.Keep.get(args.collection)) + else: + collection = None + + operations = Operations(collection, os.getuid(), os.getgid()) + + opts = [] if args.debug: - logging.basicConfig(level=logging.DEBUG) - mounter = KeepMount() - mounter.set_args(args) - fuse = fuse.FUSE(mounter, - args.mountpoint, - foreground=args.foreground, - fsname='arv-mount') + opts += ['debug'] + + llfuse.init(operations, args.mountpoint, opts) + + try: + llfuse.main() + except: + llfuse.close(unmount=True) + raise + + llfuse.close() diff --git a/sdk/python/bin/arv-mount2 b/sdk/python/bin/arv-mount2 deleted file mode 100755 index c349bdba41..0000000000 --- a/sdk/python/bin/arv-mount2 +++ /dev/null @@ -1,239 +0,0 @@ -#!/usr/bin/env python - -import os -import sys - -import llfuse -import errno -import stat -import threading -import arvados -import argparse -import pprint - -from time import time -from llfuse import FUSEError - -class Directory(object): - def __init__(self, inode, parent): - self.inode = inode - self.parent = parent - self.entries = {} - - def __getitem__(self, item): - return self.entries[item] - - def __setitem__(self, key, item): - self.entries[key] = item - - def __iter__(self): - return self.entries.iterkeys() - - def __contains__(self, k): - return k in self.entries - - def size(self): - return 0 - -class File(object): - def __init__(self, inode, parent, reader): - self.inode = inode - self.parent = parent - self.reader = reader - - def size(self): - return self.reader.size() - -class FileHandle(object): - def __init__(self, fh, entry): - self.fh = fh - self.entry = entry - -class Operations(llfuse.Operations): - - def __init__(self, collection, uid, gid): - super(Operations, self).__init__() - #self.api = arvados.api('v1') - - # dict of inodes to collection entry - self._inodes = {} - - self.uid = uid - self.gid = gid - - #print "root_parent", root_parent, "llfuse.ROOT_INODE", llfuse.ROOT_INODE - - i = llfuse.ROOT_INODE - self.root = Directory(i, i) - self._inodes[i] = self.root - - for s in collection.all_streams(): - cwd = self.root - for part in s.name().split('/'): - if part != '' and part != '.': - if part not in cwd: - i += 1 - cwd[part] = Directory(i, cwd.inode) - self._inodes[i] = cwd[part] - cwd = cwd[part] - for k, v in s.files().items(): - i += 1 - cwd[k] = File(i, cwd.inode, v) - self._inodes[i] = cwd[k] - - # dict of inode to filehandle - self._filehandles = {} - self._filehandles_counter = 1 - - def access(self, inode, mode, ctx): - return True - - def getattr(self, inode): - e = self._inodes[inode] - - entry = llfuse.EntryAttributes() - entry.st_ino = inode - entry.generation = 0 - entry.entry_timeout = 300 - entry.attr_timeout = 300 - - entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH - if isinstance(e, Directory): - entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR - else: - entry.st_mode |= stat.S_IFREG - - entry.st_nlink = 1 - entry.st_uid = self.uid - entry.st_gid = self.gid - entry.st_rdev = 0 - - entry.st_size = e.size() - - entry.st_blksize = 1024 - entry.st_blocks = e.size()/1024 - if e.size()/1024 != 0: - entry.st_blocks += 1 - entry.st_atime = 0 - entry.st_mtime = 0 - entry.st_ctime = 0 - - return entry - - def lookup(self, parent_inode, name): - #print "lookup: parent_inode", parent_inode, "name", name - inode = None - - if name == '.': - inode = parent_inode - else: - if parent_inode in self._inodes: - p = self._inodes[parent_inode] - if name == '..': - inode = p.parent - elif name in p: - inode = p[name].inode - - if inode != None: - return self.getattr(inode) - else: - raise llfuse.FUSEError(errno.ENOENT) - - def open(self, inode, flags): - if inode in self._inodes: - p = self._inodes[inode] - else: - raise llfuse.FUSEError(errno.ENOENT) - - if isinstance(p, Directory): - raise llfuse.FUSEError(errno.EISDIR) - - fh = self._filehandles_counter - self._filehandles_counter += 1 - self._filehandles[fh] = FileHandle(fh, p) - return fh - - def read(self, fh, off, size): - #print "read", fh, off, size - if fh in self._filehandles: - handle = self._filehandles[fh] - else: - raise llfuse.FUSEError(errno.EBADF) - - try: - with llfuse.lock_released: - return handle.entry.reader.readfrom(off, size) - except: - raise llfuse.FUSEError(errno.EIO) - - def release(self, fh): - if fh in self._filehandles: - del self._filehandles[fh] - - def opendir(self, inode): - #print "opendir: inode", inode - - if inode in self._inodes: - p = self._inodes[inode] - else: - raise llfuse.FUSEError(errno.ENOENT) - - if not isinstance(p, Directory): - raise llfuse.FUSEError(errno.ENOTDIR) - - fh = self._filehandles_counter - self._filehandles_counter += 1 - if p.parent in self._inodes: - parent = self._inodes[p.parent] - else: - parent = None - self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.entries.items())) - return fh - - def readdir(self, fh, off): - #print "readdir: fh", fh, "off", off - - if fh in self._filehandles: - handle = self._filehandles[fh] - else: - raise llfuse.FUSEError(errno.EBADF) - - #print "handle.entry", handle.entry - - e = off - while e < len(handle.entry): - yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1) - e += 1 - - def releasedir(self, fh): - del self._filehandles[fh] - -if __name__ == '__main__': - - parser = argparse.ArgumentParser( - description='Mount Keep data under the local filesystem.') - parser.add_argument('mountpoint', type=str, help="""Mount point.""") - parser.add_argument('--collection', type=str, required=True, help="""Collection locator""") - parser.add_argument('--debug', action='store_true', help="""Debug mode""") - - args = parser.parse_args() - - # for testing only! - #manifest = open('/home/peter/arvados/sdk/python/testdata/jlake_manifest').read() - #operations = Operations(arvados.CollectionReader(manifest), os.getuid(), os.getgid()) - - operations = Operations(arvados.CollectionReader(arvados.Keep.get(args.collection)), os.getuid(), os.getgid()) - - opts = [] - if args.debug: - opts += ['debug'] - - llfuse.init(operations, args.mountpoint, opts) - - try: - llfuse.main() - except: - llfuse.close(unmount=True) - raise - - llfuse.close() -- 2.30.2