Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19
20 import arvados.commands._util as arv_cmd
21 from arvados_fuse import crunchstat
22 from arvados_fuse import *
23 from arvados_fuse.unmount import unmount
24 from arvados_fuse._version import __version__
25
26 class ArgumentParser(argparse.ArgumentParser):
27     def __init__(self):
28         super(ArgumentParser, self).__init__(
29             parents=[arv_cmd.retry_opt],
30             description='''Mount Keep data under the local filesystem.  Default mode is --home''',
31             epilog="""
32     Note: When using the --exec feature, you must either specify the
33     mountpoint before --exec, or mark the end of your --exec arguments
34     with "--".
35             """)
36         self.add_argument('--version', action='version',
37                           version=u"%s %s" % (sys.argv[0], __version__),
38                           help='Print version and exit.')
39         self.add_argument('mountpoint', type=str, help="""Mount point.""")
40         self.add_argument('--allow-other', action='store_true',
41                             help="""Let other users read the mount""")
42         self.add_argument('--subtype', type=str, metavar='STRING',
43                             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""")
44
45         mode = self.add_mutually_exclusive_group()
46
47         mode.add_argument('--all', action='store_const', const='all', dest='mode',
48                                 help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
49         mode.add_argument('--custom', action='store_const', const=None, dest='mode',
50                                 help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
51         mode.add_argument('--home', action='store_const', const='home', dest='mode',
52                                 help="""Mount only the user's home project.""")
53         mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
54                                 help="""Mount only list of projects shared with the user.""")
55         mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
56                                 help="""Mount subdirectories listed by tag.""")
57         mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
58                                 help="""Mount subdirectories listed by portable data hash or uuid.""")
59         mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
60                                 help="""Mount subdirectories listed by portable data hash.""")
61         mode.add_argument('--project', type=str, metavar='UUID',
62                                 help="""Mount the specified project.""")
63         mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
64                                 help="""Mount only the specified collection.""")
65
66         mounts = self.add_argument_group('Custom mount options')
67         mounts.add_argument('--mount-by-pdh',
68                             type=str, metavar='PATH', action='append', default=[],
69                             help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
70         mounts.add_argument('--mount-by-id',
71                             type=str, metavar='PATH', action='append', default=[],
72                             help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
73         mounts.add_argument('--mount-by-tag',
74                             type=str, metavar='PATH', action='append', default=[],
75                             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
76         mounts.add_argument('--mount-home',
77                             type=str, metavar='PATH', action='append', default=[],
78                             help="Mount the current user's home project at mountpoint/PATH.")
79         mounts.add_argument('--mount-shared',
80                             type=str, metavar='PATH', action='append', default=[],
81                             help="Mount projects shared with the current user at mountpoint/PATH.")
82         mounts.add_argument('--mount-tmp',
83                             type=str, metavar='PATH', action='append', default=[],
84                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
85
86
87         self.add_argument('--debug', action='store_true', help="""Debug mode""")
88         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
89         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
90         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
91
92         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
93         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
94
95         cachetype = self.add_mutually_exclusive_group()
96         cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
97         cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
98
99         self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
100
101         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
102
103         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
104         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
105         self.add_argument('--storage-classes', type=str, metavar='CLASSES', help="Specify comma separated list of storage classes to be used when saving data of new collections", default=None)
106
107         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
108
109         unmount = self.add_mutually_exclusive_group()
110         unmount.add_argument('--unmount', action='store_true', default=False,
111                              help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
112         unmount.add_argument('--unmount-all', action='store_true', default=False,
113                              help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
114         unmount.add_argument('--replace', action='store_true', default=False,
115                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
116         self.add_argument('--unmount-timeout',
117                           type=float, default=2.0,
118                           help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
119
120         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
121                             dest="exec_args", metavar=('command', 'args', '...', '--'),
122                             help="""Mount, run a command, then unmount and exit""")
123
124
125 class Mount(object):
126     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
127         self.daemon = False
128         self.logger = logger
129         self.args = args
130         self.listen_for_events = False
131
132         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
133         if self.args.logfile:
134             self.args.logfile = os.path.realpath(self.args.logfile)
135
136         try:
137             self._setup_logging()
138             self._setup_api()
139             self._setup_mount()
140         except Exception as e:
141             self.logger.exception("arv-mount: exception during setup: %s", e)
142             exit(1)
143
144     def __enter__(self):
145         if self.args.replace:
146             unmount(path=self.args.mountpoint,
147                     timeout=self.args.unmount_timeout)
148         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
149         if self.daemon:
150             daemon.DaemonContext(
151                 working_directory=os.path.dirname(self.args.mountpoint),
152                 files_preserve=list(range(
153                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
154             ).open()
155         if self.listen_for_events and not self.args.disable_event_listening:
156             self.operations.listen_for_events()
157         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
158         self.llfuse_thread.daemon = True
159         self.llfuse_thread.start()
160         self.operations.initlock.wait()
161         return self
162
163     def __exit__(self, exc_type, exc_value, traceback):
164         if self.operations.events:
165             self.operations.events.close(timeout=self.args.unmount_timeout)
166         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
167         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
168         if self.llfuse_thread.is_alive():
169             self.logger.warning("Mount.__exit__:"
170                                 " llfuse thread still alive %fs after umount"
171                                 " -- abandoning and exiting anyway",
172                                 self.args.unmount_timeout)
173
174     def run(self):
175         if self.args.unmount or self.args.unmount_all:
176             unmount(path=self.args.mountpoint,
177                     subtype=self.args.subtype,
178                     timeout=self.args.unmount_timeout,
179                     recursive=self.args.unmount_all)
180         elif self.args.exec_args:
181             self._run_exec()
182         else:
183             self._run_standalone()
184
185     def _fuse_options(self):
186         """FUSE mount options; see mount.fuse(8)"""
187         opts = [optname for optname in ['allow_other', 'debug']
188                 if getattr(self.args, optname)]
189         # Increase default read/write size from 4KiB to 128KiB
190         opts += ["big_writes", "max_read=131072"]
191         if self.args.subtype:
192             opts += ["subtype="+self.args.subtype]
193         return opts
194
195     def _setup_logging(self):
196         # Configure a log handler based on command-line switches.
197         if self.args.logfile:
198             log_handler = logging.FileHandler(self.args.logfile)
199             log_handler.setFormatter(logging.Formatter(
200                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
201                 '%Y-%m-%d %H:%M:%S'))
202         else:
203             log_handler = None
204
205         if log_handler is not None:
206             arvados.logger.removeHandler(arvados.log_handler)
207             arvados.logger.addHandler(log_handler)
208
209         if self.args.debug:
210             arvados.logger.setLevel(logging.DEBUG)
211             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
212             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
213             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
214             self.logger.debug("arv-mount debugging enabled")
215
216         self.logger.info("%s %s started", sys.argv[0], __version__)
217         self.logger.info("enable write is %s", self.args.enable_write)
218
219     def _setup_api(self):
220         try:
221             self.api = arvados.safeapi.ThreadSafeApiCache(
222                 apiconfig=arvados.config.settings(),
223                 # default value of file_cache is 0, this tells KeepBlockCache to
224                 # choose a default based on whether disk_cache is enabled or not.
225                 keep_params={
226                     'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
227                                                                disk_cache=self.args.disk_cache,
228                                                                disk_cache_dir=self.args.disk_cache_dir),
229                     'num_retries': self.args.retries,
230                 })
231         except KeyError as e:
232             self.logger.error("Missing environment: %s", e)
233             exit(1)
234         # Do a sanity check that we have a working arvados host + token.
235         self.api.users().current().execute()
236
237     def _setup_mount(self):
238         self.operations = Operations(
239             os.getuid(),
240             os.getgid(),
241             api_client=self.api,
242             encoding=self.args.encoding,
243             inode_cache=InodeCache(cap=self.args.directory_cache),
244             enable_write=self.args.enable_write)
245
246         if self.args.crunchstat_interval:
247             statsthread = threading.Thread(
248                 target=crunchstat.statlogger,
249                 args=(self.args.crunchstat_interval,
250                       self.api.keep,
251                       self.operations))
252             statsthread.daemon = True
253             statsthread.start()
254
255         usr = self.api.users().current().execute(num_retries=self.args.retries)
256         now = time.time()
257         dir_class = None
258         dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries, self.args.enable_write]
259         mount_readme = False
260
261         storage_classes = None
262         if self.args.storage_classes is not None:
263             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
264             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
265
266         if self.args.collection is not None:
267             # Set up the request handler with the collection at the root
268             # First check that the collection is readable
269             self.api.collections().get(uuid=self.args.collection).execute()
270             self.args.mode = 'collection'
271             dir_class = CollectionDirectory
272             dir_args.append(self.args.collection)
273         elif self.args.project is not None:
274             self.args.mode = 'project'
275             dir_class = ProjectDirectory
276             dir_args.append(
277                 self.api.groups().get(uuid=self.args.project).execute(
278                     num_retries=self.args.retries))
279
280         if (self.args.mount_by_id or
281             self.args.mount_by_pdh or
282             self.args.mount_by_tag or
283             self.args.mount_home or
284             self.args.mount_shared or
285             self.args.mount_tmp):
286             if self.args.mode is not None:
287                 sys.exit(
288                     "Cannot combine '{}' mode with custom --mount-* options.".
289                     format(self.args.mode))
290         elif self.args.mode is None:
291             # If no --mount-custom or custom mount args, --all is the default
292             self.args.mode = 'all'
293
294         if self.args.mode in ['by_id', 'by_pdh']:
295             # Set up the request handler with the 'magic directory' at the root
296             dir_class = MagicDirectory
297             dir_args.append(self.args.mode == 'by_pdh')
298         elif self.args.mode == 'by_tag':
299             dir_class = TagsDirectory
300         elif self.args.mode == 'shared':
301             dir_class = SharedDirectory
302             dir_args.append(usr)
303         elif self.args.mode == 'home':
304             dir_class = ProjectDirectory
305             dir_args.append(usr)
306             dir_args.append(True)
307         elif self.args.mode == 'all':
308             self.args.mount_by_id = ['by_id']
309             self.args.mount_by_tag = ['by_tag']
310             self.args.mount_home = ['home']
311             self.args.mount_shared = ['shared']
312             mount_readme = True
313
314         if dir_class is not None:
315             if dir_class in [TagsDirectory, CollectionDirectory]:
316                 ent = dir_class(*dir_args)
317             else:
318                 ent = dir_class(*dir_args, storage_classes=storage_classes)
319             self.operations.inodes.add_entry(ent)
320             self.listen_for_events = ent.want_event_subscribe()
321             return
322
323         e = self.operations.inodes.add_entry(Directory(
324             llfuse.ROOT_INODE, self.operations.inodes, self.api.config, self.args.enable_write))
325         dir_args[0] = e.inode
326
327         for name in self.args.mount_by_id:
328             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
329         for name in self.args.mount_by_pdh:
330             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
331         for name in self.args.mount_by_tag:
332             self._add_mount(e, name, TagsDirectory(*dir_args))
333         for name in self.args.mount_home:
334             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
335         for name in self.args.mount_shared:
336             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
337         for name in self.args.mount_tmp:
338             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
339
340         if mount_readme:
341             text = self._readme_text(
342                 arvados.config.get('ARVADOS_API_HOST'),
343                 usr['email'])
344             self._add_mount(e, 'README', StringFile(e.inode, text, now))
345
346     def _add_mount(self, tld, name, ent):
347         if name in ['', '.', '..'] or '/' in name:
348             sys.exit("Mount point '{}' is not supported.".format(name))
349         tld._entries[name] = self.operations.inodes.add_entry(ent)
350         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
351
352     def _readme_text(self, api_host, user_email):
353         return '''
354 Welcome to Arvados!  This directory provides file system access to
355 files and objects available on the Arvados installation located at
356 '{}' using credentials for user '{}'.
357
358 From here, the following directories are available:
359
360   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
361   by_tag/    Access to Keep collections organized by tag.
362   home/      The contents of your home project.
363   shared/    Projects shared with you.
364
365 '''.format(api_host, user_email)
366
367     def _run_exec(self):
368         rc = 255
369         with self:
370             try:
371                 sp = subprocess.Popen(self.args.exec_args, shell=False)
372
373                 # forward signals to the process.
374                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
375                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
376                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
377
378                 # wait for process to complete.
379                 rc = sp.wait()
380
381                 # restore default signal handlers.
382                 signal.signal(signal.SIGINT, signal.SIG_DFL)
383                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
384                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
385             except Exception as e:
386                 self.logger.exception(
387                     'arv-mount: exception during exec %s', self.args.exec_args)
388                 try:
389                     rc = e.errno
390                 except AttributeError:
391                     pass
392         exit(rc)
393
394     def _run_standalone(self):
395         try:
396             self.daemon = not self.args.foreground
397             with self:
398                 self.llfuse_thread.join(timeout=None)
399         except Exception as e:
400             self.logger.exception('arv-mount: exception during mount: %s', e)
401             exit(getattr(e, 'errno', 1))
402         exit(0)
403
404     def _llfuse_main(self):
405         try:
406             llfuse.main()
407         except:
408             llfuse.close(unmount=False)
409             raise
410         llfuse.close()