7751: Add --mount-tmp option.
authorTom Clegg <tom@curoverse.com>
Wed, 18 Nov 2015 19:32:18 +0000 (14:32 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 25 Nov 2015 15:44:58 +0000 (10:44 -0500)
services/arv-web/arv-web.py
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/bin/arv-mount
services/fuse/tests/mount_test_base.py
services/fuse/tests/test_mount.py

index 482a5776d92d08306ef8327339200418b887c723..5a95e27b93b26789d10d93f4dadeac849fcfaa9b 100755 (executable)
@@ -83,7 +83,7 @@ class ArvWeb(object):
     def run_fuse_mount(self):
         self.mountdir = tempfile.mkdtemp()
 
-        self.operations = Operations(os.getuid(), os.getgid(), "utf-8")
+        self.operations = Operations(os.getuid(), os.getgid(), self.api, "utf-8")
         self.cdir = CollectionDirectory(llfuse.ROOT_INODE, self.operations.inodes, self.api, 2, self.collection)
         self.operations.inodes.add_entry(self.cdir)
 
index fd25aa9b5eacab7e4b6038df50387375147513c6..55f1ad74883698fb39749c1070ca71cd19a873f6 100644 (file)
@@ -76,7 +76,7 @@ import Queue
 
 llfuse.capi._notify_queue = Queue.Queue()
 
-from fusedir import sanitize_filename, Directory, CollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
 from fusefile import StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
@@ -304,9 +304,11 @@ class Operations(llfuse.Operations):
 
     """
 
-    def __init__(self, uid, gid, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
+    def __init__(self, uid, gid, api_client, encoding="utf-8", inode_cache=None, num_retries=4, enable_write=False):
         super(Operations, self).__init__()
 
+        self._api_client = api_client
+
         if not inode_cache:
             inode_cache = InodeCache(cap=256*1024*1024)
         self.inodes = Inodes(inode_cache, encoding=encoding)
@@ -347,8 +349,8 @@ class Operations(llfuse.Operations):
     def access(self, inode, mode, ctx):
         return True
 
-    def listen_for_events(self, api_client):
-        self.events = arvados.events.subscribe(api_client,
+    def listen_for_events(self):
+        self.events = arvados.events.subscribe(self._api_client,
                                  [["event_type", "in", ["create", "update", "delete"]]],
                                  self.on_event)
 
index fdc93fb3beb37b7e4d4eda8400eb6f72ff82423a..21961a56f41d8776133173565edfad030c46444c 100644 (file)
@@ -474,6 +474,74 @@ class CollectionDirectory(CollectionDirectoryBase):
             self.collection.stop_threads()
 
 
+class TmpCollectionDirectory(CollectionDirectoryBase):
+    """A directory backed by an Arvados collection that never gets saved.
+
+    This supports using Keep as scratch space. A userspace program can
+    read the .arvados#collection file to get a current manifest in
+    order to save a snapshot of the scratch data or use it as a crunch
+    job output.
+    """
+
+    def __init__(self, parent_inode, inodes, api_client, num_retries):
+        collection = arvados.collection.Collection(
+            api_client=api_client,
+            keep_client=api_client.keep)
+        collection.save = self._commit_collection
+        collection.save_new = self._commit_collection
+        super(TmpCollectionDirectory, self).__init__(
+            parent_inode, inodes, collection)
+        self.collection_record_file = None
+        self._subscribed = False
+        self._update_collection_record()
+
+    def update(self, *args, **kwargs):
+        if not self._subscribed:
+            with llfuse.lock_released:
+                self.populate(self.mtime())
+            self._subscribed = True
+
+    @use_counter
+    def _commit_collection(self):
+        """Commit the data blocks, but don't save the collection to API.
+
+        Update the content of the special .arvados#collection file, if
+        it has been instantiated.
+        """
+        self.collection.flush()
+        self._update_collection_record()
+        if self.collection_record_file is not None:
+            self.collection_record_file.update(self.collection_record)
+            self.inodes.invalidate_inode(self.collection_record_file.inode)
+
+    def _update_collection_record(self):
+        self.collection_record = {
+            "uuid": None,
+            "manifest_text": self.collection.manifest_text(),
+            "portable_data_hash": self.collection.portable_data_hash(),
+        }
+
+    def __contains__(self, k):
+        return (k == '.arvados#collection' or
+                super(TmpCollectionDirectory, self).__contains__(k))
+
+    @use_counter
+    def __getitem__(self, item):
+        if item == '.arvados#collection':
+            if self.collection_record_file is None:
+                self.collection_record_file = ObjectFile(
+                    self.inode, self.collection_record)
+                self.inodes.add_entry(self.collection_record_file)
+            return self.collection_record_file
+        return super(TmpCollectionDirectory, self).__getitem__(item)
+
+    def writable(self):
+        return True
+
+    def finalize(self):
+        self.collection.stop_threads()
+
+
 class MagicDirectory(Directory):
     """A special directory that logically contains the set of all extant keep locators.
 
index 44bc698299975230792de112fa2644e6b458c7b8..a6799051bb4794671967a9f6b97853312b2345ac 100755 (executable)
@@ -7,6 +7,7 @@ import logging
 import os
 import signal
 import subprocess
+import sys
 import time
 
 import arvados.commands._util as arv_cmd
@@ -92,17 +93,44 @@ with "--".
 
     mount_mode = parser.add_mutually_exclusive_group()
 
-    mount_mode.add_argument('--all', action='store_true', help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default).""")
-    mount_mode.add_argument('--home', action='store_true', help="""Mount only the user's home project.""")
-    mount_mode.add_argument('--shared', action='store_true', help="""Mount only list of projects shared with the user.""")
-    mount_mode.add_argument('--by-tag', action='store_true',
+    mount_mode.add_argument('--all', action='store_const', const='all', dest='mode',
+                            help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
+    mount_mode.add_argument('--custom', action='store_const', const=None, dest='mode',
+                            help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
+    mount_mode.add_argument('--home', action='store_const', const='home', dest='mode',
+                            help="""Mount only the user's home project.""")
+    mount_mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
+                            help="""Mount only list of projects shared with the user.""")
+    mount_mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
                             help="""Mount subdirectories listed by tag.""")
-    mount_mode.add_argument('--by-id', action='store_true',
+    mount_mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
                             help="""Mount subdirectories listed by portable data hash or uuid.""")
-    mount_mode.add_argument('--by-pdh', action='store_true',
+    mount_mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
                             help="""Mount subdirectories listed by portable data hash.""")
-    mount_mode.add_argument('--project', type=str, help="""Mount a specific project.""")
-    mount_mode.add_argument('--collection', type=str, help="""Mount only the specified collection.""")
+    mount_mode.add_argument('--project', type=str, metavar='UUID',
+                            help="""Mount the specified project.""")
+    mount_mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
+                            help="""Mount only the specified collection.""")
+
+    mounts = parser.add_argument_group('Custom mount options')
+    mounts.add_argument('--mount-by-pdh',
+                        type=str, metavar='PATH', action='append', default=[],
+                        help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
+    mounts.add_argument('--mount-by-id',
+                        type=str, metavar='PATH', action='append', default=[],
+                        help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
+    mounts.add_argument('--mount-by-tag',
+                        type=str, metavar='PATH', action='append', default=[],
+                        help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
+    mounts.add_argument('--mount-home',
+                        type=str, metavar='PATH', action='append', default=[],
+                        help="Mount the current user's home project at mountpoint/PATH.")
+    mounts.add_argument('--mount-shared',
+                        type=str, metavar='PATH', action='append', default=[],
+                        help="Mount projects shared with the current user at mountpoint/PATH.")
+    mounts.add_argument('--mount-tmp',
+                        type=str, metavar='PATH', action='append', default=[],
+                        help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
 
     parser.add_argument('--debug', action='store_true', help="""Debug mode""")
     parser.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
@@ -154,14 +182,16 @@ with "--".
     logger.info("enable write is %s", args.enable_write)
 
     try:
+        api = ThreadSafeApiCache(apiconfig=arvados.config.settings(),
+                                 keep_params={"block_cache": arvados.keep.KeepBlockCache(args.file_cache)})
+
         # Create the request handler
         operations = Operations(os.getuid(),
                                 os.getgid(),
+                                api_client=api,
                                 encoding=args.encoding,
                                 inode_cache=InodeCache(cap=args.directory_cache),
                                 enable_write=args.enable_write)
-        api = ThreadSafeApiCache(apiconfig=arvados.config.settings(),
-                                 keep_params={"block_cache": arvados.keep.KeepBlockCache(args.file_cache)})
 
         if args.crunchstat_interval:
             statsthread = threading.Thread(target=statlogger, args=(args.crunchstat_interval, api.keep, operations))
@@ -172,19 +202,38 @@ with "--".
         now = time.time()
         dir_class = None
         dir_args = [llfuse.ROOT_INODE, operations.inodes, api, args.retries]
-        if args.by_id or args.by_pdh:
+        mount_readme = False
+
+        if args.mode is not None and (
+                args.mount_by_id or
+                args.mount_by_pdh or
+                args.mount_by_tag or
+                args.mount_home or
+                args.mount_shared or
+                args.mount_tmp or
+                args.mount_collection):
+            sys.exit("Cannot combine '{}' mode with custom --mount-* options.".
+                     format(args.mode))
+
+        if args.mode in ['by_id', 'by_pdh']:
             # Set up the request handler with the 'magic directory' at the root
             dir_class = MagicDirectory
-            dir_args.append(args.by_pdh)
-        elif args.by_tag:
+            dir_args.append(args.mode == 'by_pdh')
+        elif args.mode == 'by_tag':
             dir_class = TagsDirectory
-        elif args.shared:
+        elif args.mode == 'shared':
             dir_class = SharedDirectory
             dir_args.append(usr)
-        elif args.home:
+        elif args.mode == 'home':
             dir_class = ProjectDirectory
             dir_args.append(usr)
             dir_args.append(True)
+        elif args.mode == 'all':
+            args.mount_by_id = ['by_id']
+            args.mount_by_tag = ['by_tag']
+            args.mount_home = ['home']
+            args.mount_shared = ['shared']
+            mount_readme = True
         elif args.collection is not None:
             # Set up the request handler with the collection at the root
             dir_class = CollectionDirectory
@@ -200,19 +249,29 @@ with "--".
             e = operations.inodes.add_entry(Directory(llfuse.ROOT_INODE, operations.inodes))
             dir_args[0] = e.inode
 
-            e._entries['by_id'] = operations.inodes.add_entry(MagicDirectory(*dir_args))
-
-            e._entries['by_tag'] = operations.inodes.add_entry(TagsDirectory(*dir_args))
-
-            dir_args.append(usr)
-            dir_args.append(True)
-            e._entries['home'] = operations.inodes.add_entry(ProjectDirectory(*dir_args))
-            e._entries['shared'] = operations.inodes.add_entry(SharedDirectory(*dir_args))
-
-            text = '''
-Welcome to Arvados!  This directory provides file system access to files and objects
-available on the Arvados installation located at '{}'
-using credentials for user '{}'.
+            def addMount(tld, name, ent):
+                if name in ['', '.', '..'] or '/' in name:
+                    sys.exit("Mount point '{}' is not supported.".format(name))
+                tld._entries[name] = operations.inodes.add_entry(ent)
+
+            for name in args.mount_by_id:
+                addMount(e, name, MagicDirectory(*dir_args, pdh_only=False))
+            for name in args.mount_by_pdh:
+                addMount(e, name, MagicDirectory(*dir_args, pdh_only=True))
+            for name in args.mount_by_tag:
+                addMount(e, name, TagsDirectory(*dir_args))
+            for name in args.mount_home:
+                addMount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True))
+            for name in args.mount_shared:
+                addMount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True))
+            for name in args.mount_tmp:
+                addMount(e, name, TmpCollectionDirectory(*dir_args))
+
+            if mount_readme:
+                text = '''
+Welcome to Arvados!  This directory provides file system access to
+files and objects available on the Arvados installation located at
+'{}' using credentials for user '{}'.
 
 From here, the following directories are available:
 
@@ -221,9 +280,7 @@ From here, the following directories are available:
   home/      The contents of your home project.
   shared/    Projects shared with you.
 '''.format(arvados.config.get('ARVADOS_API_HOST'), usr['email'])
-
-            e._entries["README"] = operations.inodes.add_entry(StringFile(e.inode, text, now))
-
+                addMount(e, StringFile(e.inode, text, now))
 
     except Exception:
         logger.exception("arv-mount: exception during API setup")
@@ -241,8 +298,8 @@ From here, the following directories are available:
         llfuse.init(operations, args.mountpoint, opts)
 
         # Subscribe to change events from API server
-        if not args.by_pdh:
-            operations.listen_for_events(api)
+        if args.mode != 'by_pdh':
+            operations.listen_for_events()
 
         t = threading.Thread(None, lambda: llfuse.main())
         t.start()
@@ -283,7 +340,7 @@ From here, the following directories are available:
             llfuse.init(operations, args.mountpoint, opts)
 
             # Subscribe to change events from API server
-            operations.listen_for_events(api)
+            operations.listen_for_events()
 
             llfuse.main()
         except Exception as e:
index 3b7cbaaadb0a8ab9c490c1a73a1648b54d89728c..9fb24dbcb133db69e82b96b7e2a1c655dd1fc508 100644 (file)
@@ -35,7 +35,10 @@ class MountTestBase(unittest.TestCase):
         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def make_mount(self, root_class, **root_kwargs):
-        self.operations = fuse.Operations(os.getuid(), os.getgid(), enable_write=True)
+        self.operations = fuse.Operations(
+            os.getuid(), os.getgid(),
+            api_client=self.api,
+            enable_write=True)
         self.operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(self.operations, self.mounttmp, [])
index 1d7b9087ab415c563ed92e795337abd582a78f9f..05c8685607f14a0e7938379ab8f4fab7f3001e63 100644 (file)
@@ -703,7 +703,7 @@ class FuseUpdateFromEventTest(MountTestBase):
         with llfuse.lock:
             m.new_collection(collection.api_response(), collection)
 
-        self.operations.listen_for_events(self.api)
+        self.operations.listen_for_events()
 
         d1 = llfuse.listdir(os.path.join(self.mounttmp))
         self.assertEqual([], sorted(d1))