#!/usr/bin/env python
-import os
-import sys
-
-import llfuse
-import errno
-import stat
-import threading
+from arvados.fuse import *
import arvados
+import subprocess
import argparse
-import pprint
-
-from time import time
-from llfuse import FUSEError
-
-class Directory(object):
- def __init__(self, parent_inode):
- self.inode = None
- self.parent_inode = parent_inode
- self._entries = {}
-
- def __getitem__(self, item):
- return self._entries[item]
-
- def __setitem__(self, key, item):
- self._entries[key] = item
-
- def __iter__(self):
- return self._entries.iterkeys()
-
- def items(self):
- return self._entries.items()
-
- def __contains__(self, k):
- return k in self._entries
-
- def size(self):
- return 0
-
-class MagicDirectory(Directory):
- def __init__(self, parent_inode, ops):
- super(MagicDirectory, self).__init__(parent_inode)
- self.ops = ops
-
- def __contains__(self, k):
- #print 'contains',k
- if k in self._entries:
- return True
- try:
- if arvados.Keep.get(k):
- #print 'in keep'
- return True
- else:
- #print 'not in keep'
- return False
- except Exception as e:
- #print 'exception keep', e
- return False
-
- def __getitem__(self, item):
- if item not in self._entries:
- collection = arvados.CollectionReader(arvados.Keep.get(item))
- self._entries[item] = self.ops.add_entry(Directory(self.inode))
- self.ops.load_collection(self._entries[item], collection)
- return self._entries[item]
-
-class File(object):
- def __init__(self, parent_inode, reader):
- self.inode = None
- self.parent_inode = parent_inode
- self.reader = reader
-
- def size(self):
- return self.reader.size()
-
-class FileHandle(object):
- def __init__(self, fh, entry):
- self.fh = fh
- self.entry = entry
-
-class Operations(llfuse.Operations):
-
- def __init__(self, collection, uid, gid):
- super(Operations, self).__init__()
- #self.api = arvados.api('v1')
-
- # dict of inodes to collection entry
- self.inodes = {}
-
- self.uid = uid
- self.gid = gid
-
- #print "root_parent", root_parent, "llfuse.ROOT_INODE", llfuse.ROOT_INODE
-
- if collection:
- self.inodes_counter = llfuse.ROOT_INODE
- self.root = Directory(self.inodes_counter)
- self.root.inode = self.inodes_counter
- self.inodes[self.root.inode] = self.root
- self.load_collection(self.root, collection)
- else:
- self.inodes_counter = llfuse.ROOT_INODE
- self.root = MagicDirectory(self.inodes_counter, self)
- self.root.inode = self.inodes_counter
- self.inodes[self.root.inode] = self.root
-
- # dict of inode to filehandle
- self._filehandles = {}
- self._filehandles_counter = 1
-
- def load_collection(self, parent_dir, collection):
- for s in collection.all_streams():
- cwd = parent_dir
- for part in s.name().split('/'):
- if part != '' and part != '.':
- if part not in cwd:
- cwd[part] = self.add_entry(Directory(cwd.inode))
- cwd = cwd[part]
- for k, v in s.files().items():
- cwd[k] = self.add_entry(File(cwd.inode, v))
-
- def add_entry(self, entry):
- self.inodes_counter += 1
- entry.inode = self.inodes_counter
- self.inodes[entry.inode] = entry
- return entry
-
- def access(self, inode, mode, ctx):
- return True
-
- def getattr(self, inode):
- e = self.inodes[inode]
-
- entry = llfuse.EntryAttributes()
- entry.st_ino = inode
- entry.generation = 0
- entry.entry_timeout = 300
- entry.attr_timeout = 300
-
- entry.st_mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
- if isinstance(e, Directory):
- entry.st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH | stat.S_IFDIR
- else:
- entry.st_mode |= stat.S_IFREG
-
- entry.st_nlink = 1
- entry.st_uid = self.uid
- entry.st_gid = self.gid
- entry.st_rdev = 0
-
- entry.st_size = e.size()
-
- entry.st_blksize = 1024
- entry.st_blocks = e.size()/1024
- if e.size()/1024 != 0:
- entry.st_blocks += 1
- entry.st_atime = 0
- entry.st_mtime = 0
- entry.st_ctime = 0
-
- return entry
-
- def lookup(self, parent_inode, name):
- #print "lookup: parent_inode", parent_inode, "name", name
- inode = None
-
- if name == '.':
- inode = parent_inode
- else:
- if parent_inode in self.inodes:
- p = self.inodes[parent_inode]
- if name == '..':
- inode = p.parent_inode
- elif name in p:
- inode = p[name].inode
-
- if inode != None:
- return self.getattr(inode)
- else:
- raise llfuse.FUSEError(errno.ENOENT)
-
- def open(self, inode, flags):
- if inode in self.inodes:
- p = self.inodes[inode]
- else:
- raise llfuse.FUSEError(errno.ENOENT)
-
- if (flags & os.O_WRONLY) or (flags & os.O_RDWR):
- raise llfuse.FUSEError(errno.EROFS)
-
- if isinstance(p, Directory):
- raise llfuse.FUSEError(errno.EISDIR)
-
- fh = self._filehandles_counter
- self._filehandles_counter += 1
- self._filehandles[fh] = FileHandle(fh, p)
- return fh
-
- def read(self, fh, off, size):
- #print "read", fh, off, size
- if fh in self._filehandles:
- handle = self._filehandles[fh]
- else:
- raise llfuse.FUSEError(errno.EBADF)
-
- try:
- with llfuse.lock_released:
- return handle.entry.reader.readfrom(off, size)
- except:
- raise llfuse.FUSEError(errno.EIO)
-
- def release(self, fh):
- if fh in self._filehandles:
- del self._filehandles[fh]
-
- def opendir(self, inode):
- #print "opendir: inode", inode
-
- if inode in self.inodes:
- p = self.inodes[inode]
- else:
- raise llfuse.FUSEError(errno.ENOENT)
-
- if not isinstance(p, Directory):
- raise llfuse.FUSEError(errno.ENOTDIR)
-
- fh = self._filehandles_counter
- self._filehandles_counter += 1
- if p.parent_inode in self.inodes:
- parent = self.inodes[p.parent_inode]
- else:
- parent = None
- self._filehandles[fh] = FileHandle(fh, [('.', p), ('..', parent)] + list(p.items()))
- return fh
-
- def readdir(self, fh, off):
- #print "readdir: fh", fh, "off", off
-
- if fh in self._filehandles:
- handle = self._filehandles[fh]
- else:
- raise llfuse.FUSEError(errno.EBADF)
-
- #print "handle.entry", handle.entry
-
- e = off
- while e < len(handle.entry):
- yield (handle.entry[e][0], self.getattr(handle.entry[e][1].inode), e+1)
- e += 1
-
- def releasedir(self, fh):
- del self._filehandles[fh]
-
- def statfs(self):
- st = llfuse.StatvfsData()
- st.f_bsize = 4096
- st.f_blocks = 0
- st.f_files = 0
-
- st.f_bfree = 0
- st.f_bavail = 0
-
- st.f_ffree = 0
- st.f_favail = 0
-
- st.f_frsize = 0
- return st
-
- def create(self, p1, p2, p3, p4, p5):
- raise llfuse.FUSEError(errno.EROFS)
-
- def rename(self, inode_parent_old, name_old, inode_parent_new, name_new):
- raise llfuse.FUSEError(errno.EROFS)
-
- def unlink(self, parent_inode, name):
- raise llfuse.FUSEError(errno.EROFS)
-
- def rmdir(self, parent_inode, name):
- raise llfuse.FUSEError(errno.EROFS)
-
- def symlink(self, inode_parent, name, target, ctx):
- raise llfuse.FUSEError(errno.EROFS)
-
if __name__ == '__main__':
-
+ # Handle command line parameters
parser = argparse.ArgumentParser(
- description='Mount Keep data under the local filesystem.')
+ description='Mount Keep data under the local filesystem.',
+ epilog="""
+Note: When using the --exec feature, you must either specify the
+mountpoint before --exec, or mark the end of your --exec arguments
+with "--".
+""")
parser.add_argument('mountpoint', type=str, help="""Mount point.""")
parser.add_argument('--collection', type=str, help="""Collection locator""")
parser.add_argument('--debug', action='store_true', help="""Debug mode""")
+ parser.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
+ dest="exec_args", metavar=('command', 'args', '...', '--'),
+ help="""Mount, run a command, then unmount and exit""")
args = parser.parse_args()
+ # Create the request handler
+ operations = Operations(os.getuid(), os.getgid())
+
if args.collection != None:
- collection = arvados.CollectionReader(arvados.Keep.get(args.collection))
+ # Set up the request handler with the collection at the root
+ e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE))
+ operations.inodes.load_collection(e, arvados.CollectionReader(arvados.Keep.get(args.collection)))
else:
- collection = None
+ # Set up the request handler with the 'magic directory' at the root
+ operations.inodes.add_entry(MagicDirectory(llfuse.ROOT_INODE, operations.inodes))
- operations = Operations(collection, os.getuid(), os.getgid())
-
+ # FUSE options, see mount.fuse(8)
opts = []
- if args.debug:
- opts += ['debug']
- llfuse.init(operations, args.mountpoint, opts)
+ # Enable FUSE debugging (logs each FUSE request)
+ if args.debug:
+ opts += ['debug']
- try:
- llfuse.main()
- except:
- llfuse.close(unmount=True)
- raise
+ # Initialize the fuse connection
+ llfuse.init(operations, args.mountpoint, opts)
+
+ if args.exec_args:
+ t = threading.Thread(None, lambda: llfuse.main())
+ t.start()
- llfuse.close()
+ # wait until the driver is finished initializing
+ operations.initlock.wait()
+
+ rc = 255
+ try:
+ rc = subprocess.call(args.exec_args, shell=False)
+ except OSError as e:
+ sys.stderr.write('arv-mount: %s -- exec %s\n' % (str(e), args.exec_args))
+ rc = e.errno
+ except Exception as e:
+ sys.stderr.write('arv-mount: %s\n' % str(e))
+ finally:
+ subprocess.call(["fusermount", "-u", "-z", args.mountpoint])
+
+ exit(rc)
+ else:
+ llfuse.main()