21504: Refine metavars for arv-mount options
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description="""
32 Mount Keep data under the local filesystem.  Default mode is --home
33 """,
34             epilog="""
35     Note: When using the --exec feature, you must either specify the
36     mountpoint before --exec, or mark the end of your --exec arguments
37     with "--".
38             """)
39         self.add_argument(
40             '--version',
41             action='version',
42             version=u"%s %s" % (sys.argv[0], __version__),
43             help='Print version and exit.',
44         )
45         self.add_argument(
46             'mountpoint',
47             metavar='MOUNT_DIR',
48             help="""Mount point.""",
49         )
50
51         mode_group = self.add_argument_group("Mount contents")
52         mode = mode_group.add_mutually_exclusive_group()
53         mode.add_argument(
54             '--all',
55             action='store_const',
56             const='all',
57             dest='mode',
58             help="""
59 Mount a subdirectory for each mode: home, shared, by_tag, by_id
60 (default if no --mount-* arguments are given).
61 """,
62         )
63         mode.add_argument(
64             '--custom',
65             action='store_const',
66             const=None,
67             dest='mode',
68             help="""
69 Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments
70 (default if any --mount-* arguments are given).
71 """,
72         )
73         mode.add_argument(
74             '--collection',
75             metavar='UUID_OR_PDH',
76             help="""Mount only the specified collection.""",
77         )
78         mode.add_argument(
79             '--home',
80             action='store_const',
81             const='home',
82             dest='mode',
83             help="""Mount only the user's home project.""",
84         )
85         mode.add_argument(
86             '--project',
87             metavar='UUID',
88             help="""Mount the specified project.""",
89         )
90         mode.add_argument(
91             '--shared',
92             action='store_const',
93             const='shared',
94             dest='mode',
95             help="""Mount only list of projects shared with the user.""",
96         )
97         mode.add_argument(
98             '--by-id',
99             action='store_const',
100             const='by_id',
101             dest='mode',
102             help="""Mount subdirectories listed by portable data hash or uuid.""",
103         )
104         mode.add_argument(
105             '--by-pdh',
106             action='store_const',
107             const='by_pdh',
108             dest='mode',
109             help="""Mount subdirectories listed by portable data hash.""",
110         )
111         mode.add_argument(
112             '--by-tag',
113             action='store_const',
114             const='by_tag',
115             dest='mode',
116             help="""Mount subdirectories listed by tag.""",
117         )
118
119         mounts = self.add_argument_group("Mount custom layout and filtering")
120         mounts.add_argument(
121             '--filters',
122             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
123             help="""
124 Filters to apply to all project, shared, and tag directory contents.
125 Pass filters as either a JSON string or a path to a JSON file.
126 The JSON object should be a list of filters in Arvados API list filter syntax.
127 """,
128         )
129         mounts.add_argument(
130             '--mount-home',
131             metavar='PATH',
132             action='append',
133             default=[],
134             help="Mount the current user's home project at mountpoint/PATH.",
135         )
136         mounts.add_argument(
137             '--mount-by-id',
138             metavar='PATH',
139             action='append',
140             default=[],
141             help="""
142 Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH
143 where PDH is the collection's portable data hash and UUID is its UUID.
144 """,
145         )
146         mounts.add_argument(
147             '--mount-by-pdh',
148             metavar='PATH',
149             action='append',
150             default=[],
151             help="""
152 Mount each readable collection at mountpoint/PATH/P
153 where P is the collection's portable data hash.
154 """,
155         )
156         mounts.add_argument(
157             '--mount-shared',
158             metavar='PATH',
159             action='append',
160             default=[],
161             help="Mount projects shared with the current user at mountpoint/PATH.",
162         )
163         mounts.add_argument(
164             '--mount-by-tag',
165             metavar='PATH',
166             action='append',
167             default=[],
168             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.",
169         )
170         mounts.add_argument(
171             '--mount-tmp',
172             metavar='PATH',
173             action='append',
174             default=[],
175             help="""
176 Create a new collection, mount it in read/write mode at mountpoint/PATH,
177 and delete it when unmounting.
178 """,
179         )
180
181         perms = self.add_argument_group("Mount access and permissions")
182         perms.add_argument(
183             '--allow-other',
184             action='store_true',
185             help="""Let other users read the mount""",
186         )
187         perms.add_argument(
188             '--read-only',
189             action='store_false',
190             default=False,
191             dest='enable_write',
192             help="Mount will be read only (default)",
193         )
194         perms.add_argument(
195             '--read-write',
196             action='store_true',
197             default=False,
198             dest='enable_write',
199             help="Mount will be read-write",
200         )
201
202         lifecycle = self.add_argument_group("Mount lifecycle management")
203         lifecycle.add_argument(
204             '--exec',
205             nargs=argparse.REMAINDER,
206             dest="exec_args",
207             help="""Mount, run a command, then unmount and exit""",
208         )
209         lifecycle.add_argument(
210             '--foreground',
211             action='store_true',
212             default=False,
213             help="""Run in foreground (default is to daemonize unless --exec specified)""",
214         )
215         lifecycle.add_argument(
216             '--subtype',
217             help="""Report mounted filesystem type as "fuse.SUBTYPE", instead of just "fuse".""",
218         )
219         unmount = lifecycle.add_mutually_exclusive_group()
220         unmount.add_argument(
221             '--replace',
222             action='store_true',
223             default=False,
224             help="""
225 If a fuse mount is already present at mountpoint, forcefully unmount it before mounting
226 """,
227         )
228         unmount.add_argument(
229             '--unmount',
230             action='store_true',
231             default=False,
232             help="""
233 Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit.
234 If --subtype is given, unmount only if the mount has the specified subtype.
235 WARNING: This command can affect any kind of fuse mount, not just arv-mount.
236 """,
237         )
238         unmount.add_argument(
239             '--unmount-all',
240             action='store_true',
241             default=False,
242             help="""
243 Forcefully unmount every fuse mount at or below the specified path and exit.
244 If --subtype is given, unmount only mounts that have the specified subtype.
245 Exit non-zero if any other types of mounts are found at or below the given path.
246 WARNING: This command can affect any kind of fuse mount, not just arv-mount.
247 """,
248         )
249         lifecycle.add_argument(
250             '--unmount-timeout',
251             type=float,
252             default=2.0,
253             metavar='SECONDS',
254             help="""
255 Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted
256 """,
257         )
258
259         reporting = self.add_argument_group("Mount logging and statistics")
260         reporting.add_argument(
261             '--crunchstat-interval',
262             type=float,
263             default=0.0,
264             metavar='SECONDS',
265             help="Write stats to stderr every N seconds (default disabled)",
266         )
267         reporting.add_argument(
268             '--debug',
269             action='store_true',
270             help="""Debug mode""",
271         )
272         reporting.add_argument(
273             '--logfile',
274             help="""Write debug logs and errors to the specified file (default stderr).""",
275         )
276
277         cache = self.add_argument_group("Mount local cache setup")
278         cachetype = cache.add_mutually_exclusive_group()
279         cachetype.add_argument(
280             '--disk-cache',
281             action='store_true',
282             default=True,
283             dest='disk_cache',
284             help="Use disk based caching (default)",
285         )
286         cachetype.add_argument(
287             '--ram-cache',
288             action='store_false',
289             default=True,
290             dest='disk_cache',
291             help="Use in-memory caching only",
292         )
293         cache.add_argument(
294             '--disk-cache-dir',
295             metavar="DIRECTORY",
296             help="Disk cache location (default ~/.cache/arvados/keep)",
297         )
298         cache.add_argument(
299             '--directory-cache',
300             type=int,
301             default=128*1024*1024,
302             metavar='BYTES',
303             help="Directory data cache size, in bytes (default 128 MiB)",
304         )
305         cache.add_argument(
306             '--file-cache',
307             type=int,
308             default=0,
309             metavar='BYTES',
310             help="""
311 File data cache size, in bytes
312 (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)
313 """,
314         )
315
316         plumbing = self.add_argument_group("Mount interactions with Arvados and Linux")
317         plumbing.add_argument(
318             '--disable-event-listening',
319             action='store_true',
320             dest='disable_event_listening',
321             default=False,
322             help="Don't subscribe to events on the API server",
323         )
324         plumbing.add_argument(
325             '--encoding',
326             default="utf-8",
327             help="""
328 Character encoding to use for filesystem, default is utf-8
329 (see Python codec registry for list of available encodings)
330 """,
331         )
332         plumbing.add_argument(
333             '--storage-classes',
334             metavar='CLASSES',
335             help="Specify comma separated list of storage classes to be used when saving data of new collections",
336         )
337
338
339 class Mount(object):
340     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
341         self.daemon = False
342         self.logger = logger
343         self.args = args
344         self.listen_for_events = False
345
346         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
347         if self.args.logfile:
348             self.args.logfile = os.path.realpath(self.args.logfile)
349
350         try:
351             self._setup_logging()
352         except Exception as e:
353             self.logger.exception("exception during setup: %s", e)
354             exit(1)
355
356         try:
357             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
358
359             minlimit = 10240
360             if self.args.file_cache:
361                 # Adjust the file handle limit so it can meet
362                 # the desired cache size. Multiply by 8 because the
363                 # number of 64 MiB cache slots that keepclient
364                 # allocates is RLIMIT_NOFILE / 8
365                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
366
367             if nofile_limit[0] < minlimit:
368                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
369
370             if minlimit > nofile_limit[1]:
371                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
372
373         except Exception as e:
374             self.logger.warning("unable to adjust file handle limit: %s", e)
375
376         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
377         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
378
379         try:
380             self._setup_api()
381             self._setup_mount()
382         except Exception as e:
383             self.logger.exception("exception during setup: %s", e)
384             exit(1)
385
386     def __enter__(self):
387         if self.args.replace:
388             unmount(path=self.args.mountpoint,
389                     timeout=self.args.unmount_timeout)
390         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
391         if self.daemon:
392             daemon.DaemonContext(
393                 working_directory=os.path.dirname(self.args.mountpoint),
394                 files_preserve=list(range(
395                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
396             ).open()
397         if self.listen_for_events and not self.args.disable_event_listening:
398             self.operations.listen_for_events()
399         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
400         self.llfuse_thread.daemon = True
401         self.llfuse_thread.start()
402         self.operations.initlock.wait()
403         return self
404
405     def __exit__(self, exc_type, exc_value, traceback):
406         if self.operations.events:
407             self.operations.events.close(timeout=self.args.unmount_timeout)
408         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
409         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
410         if self.llfuse_thread.is_alive():
411             self.logger.warning("Mount.__exit__:"
412                                 " llfuse thread still alive %fs after umount"
413                                 " -- abandoning and exiting anyway",
414                                 self.args.unmount_timeout)
415
416     def run(self):
417         if self.args.unmount or self.args.unmount_all:
418             unmount(path=self.args.mountpoint,
419                     subtype=self.args.subtype,
420                     timeout=self.args.unmount_timeout,
421                     recursive=self.args.unmount_all)
422         elif self.args.exec_args:
423             self._run_exec()
424         else:
425             self._run_standalone()
426
427     def _fuse_options(self):
428         """FUSE mount options; see mount.fuse(8)"""
429         opts = [optname for optname in ['allow_other', 'debug']
430                 if getattr(self.args, optname)]
431         # Increase default read/write size from 4KiB to 128KiB
432         opts += ["big_writes", "max_read=131072"]
433         if self.args.subtype:
434             opts += ["subtype="+self.args.subtype]
435         return opts
436
437     def _setup_logging(self):
438         # Configure a log handler based on command-line switches.
439         if self.args.logfile:
440             log_handler = logging.FileHandler(self.args.logfile)
441             log_handler.setFormatter(logging.Formatter(
442                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
443                 '%Y-%m-%d %H:%M:%S'))
444         else:
445             log_handler = None
446
447         if log_handler is not None:
448             arvados.logger.removeHandler(arvados.log_handler)
449             arvados.logger.addHandler(log_handler)
450
451         if self.args.debug:
452             arvados.logger.setLevel(logging.DEBUG)
453             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
454             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
455             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
456             self.logger.debug("arv-mount debugging enabled")
457
458         self.logger.info("%s %s started", sys.argv[0], __version__)
459         self.logger.info("enable write is %s", self.args.enable_write)
460
461     def _setup_api(self):
462         try:
463             # default value of file_cache is 0, this tells KeepBlockCache to
464             # choose a default based on whether disk_cache is enabled or not.
465
466             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
467                                                       disk_cache=self.args.disk_cache,
468                                                       disk_cache_dir=self.args.disk_cache_dir)
469
470             # If there's too many prefetch threads and you
471             # max out the CPU, delivering data to the FUSE
472             # layer actually ends up being slower.
473             # Experimentally, capping 7 threads seems to
474             # be a sweet spot.
475             prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
476
477             self.api = arvados.safeapi.ThreadSafeApiCache(
478                 apiconfig=arvados.config.settings(),
479                 api_params={
480                     'num_retries': self.args.retries,
481                 },
482                 keep_params={
483                     'block_cache': block_cache,
484                     'num_prefetch_threads': prefetch_threads,
485                     'num_retries': self.args.retries,
486                 },
487                 version='v1',
488             )
489         except KeyError as e:
490             self.logger.error("Missing environment: %s", e)
491             exit(1)
492         # Do a sanity check that we have a working arvados host + token.
493         self.api.users().current().execute()
494
495     def _setup_mount(self):
496         self.operations = Operations(
497             os.getuid(),
498             os.getgid(),
499             api_client=self.api,
500             encoding=self.args.encoding,
501             inode_cache=InodeCache(cap=self.args.directory_cache),
502             enable_write=self.args.enable_write)
503
504         if self.args.crunchstat_interval:
505             statsthread = threading.Thread(
506                 target=crunchstat.statlogger,
507                 args=(self.args.crunchstat_interval,
508                       self.api.keep,
509                       self.operations))
510             statsthread.daemon = True
511             statsthread.start()
512
513         usr = self.api.users().current().execute(num_retries=self.args.retries)
514         now = time.time()
515         dir_class = None
516         dir_args = [
517             llfuse.ROOT_INODE,
518             self.operations.inodes,
519             self.api,
520             self.args.retries,
521             self.args.enable_write,
522             self.args.filters,
523         ]
524         mount_readme = False
525
526         storage_classes = None
527         if self.args.storage_classes is not None:
528             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
529             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
530
531         if self.args.collection is not None:
532             # Set up the request handler with the collection at the root
533             # First check that the collection is readable
534             self.api.collections().get(uuid=self.args.collection).execute()
535             self.args.mode = 'collection'
536             dir_class = CollectionDirectory
537             dir_args.append(self.args.collection)
538         elif self.args.project is not None:
539             self.args.mode = 'project'
540             dir_class = ProjectDirectory
541             dir_args.append(
542                 self.api.groups().get(uuid=self.args.project).execute(
543                     num_retries=self.args.retries))
544
545         if (self.args.mount_by_id or
546             self.args.mount_by_pdh or
547             self.args.mount_by_tag or
548             self.args.mount_home or
549             self.args.mount_shared or
550             self.args.mount_tmp):
551             if self.args.mode is not None:
552                 sys.exit(
553                     "Cannot combine '{}' mode with custom --mount-* options.".
554                     format(self.args.mode))
555         elif self.args.mode is None:
556             # If no --mount-custom or custom mount args, --all is the default
557             self.args.mode = 'all'
558
559         if self.args.mode in ['by_id', 'by_pdh']:
560             # Set up the request handler with the 'magic directory' at the root
561             dir_class = MagicDirectory
562             dir_args.append(self.args.mode == 'by_pdh')
563         elif self.args.mode == 'by_tag':
564             dir_class = TagsDirectory
565         elif self.args.mode == 'shared':
566             dir_class = SharedDirectory
567             dir_args.append(usr)
568         elif self.args.mode == 'home':
569             dir_class = ProjectDirectory
570             dir_args.append(usr)
571             dir_args.append(True)
572         elif self.args.mode == 'all':
573             self.args.mount_by_id = ['by_id']
574             self.args.mount_by_tag = ['by_tag']
575             self.args.mount_home = ['home']
576             self.args.mount_shared = ['shared']
577             mount_readme = True
578
579         if dir_class is not None:
580             if dir_class in [TagsDirectory, CollectionDirectory]:
581                 ent = dir_class(*dir_args)
582             else:
583                 ent = dir_class(*dir_args, storage_classes=storage_classes)
584             self.operations.inodes.add_entry(ent)
585             self.listen_for_events = ent.want_event_subscribe()
586             return
587
588         e = self.operations.inodes.add_entry(Directory(
589             llfuse.ROOT_INODE,
590             self.operations.inodes,
591             self.api.config,
592             self.args.enable_write,
593             self.args.filters,
594         ))
595         dir_args[0] = e.inode
596
597         for name in self.args.mount_by_id:
598             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
599         for name in self.args.mount_by_pdh:
600             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
601         for name in self.args.mount_by_tag:
602             self._add_mount(e, name, TagsDirectory(*dir_args))
603         for name in self.args.mount_home:
604             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
605         for name in self.args.mount_shared:
606             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
607         for name in self.args.mount_tmp:
608             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
609
610         if mount_readme:
611             text = self._readme_text(
612                 arvados.config.get('ARVADOS_API_HOST'),
613                 usr['email'])
614             self._add_mount(e, 'README', StringFile(e.inode, text, now))
615
616     def _add_mount(self, tld, name, ent):
617         if name in ['', '.', '..'] or '/' in name:
618             sys.exit("Mount point '{}' is not supported.".format(name))
619         tld._entries[name] = self.operations.inodes.add_entry(ent)
620         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
621
622     def _readme_text(self, api_host, user_email):
623         return '''
624 Welcome to Arvados!  This directory provides file system access to
625 files and objects available on the Arvados installation located at
626 '{}' using credentials for user '{}'.
627
628 From here, the following directories are available:
629
630   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
631   by_tag/    Access to Keep collections organized by tag.
632   home/      The contents of your home project.
633   shared/    Projects shared with you.
634
635 '''.format(api_host, user_email)
636
637     def _run_exec(self):
638         rc = 255
639         with self:
640             try:
641                 sp = subprocess.Popen(self.args.exec_args, shell=False)
642
643                 # forward signals to the process.
644                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
645                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
646                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
647
648                 # wait for process to complete.
649                 rc = sp.wait()
650
651                 # restore default signal handlers.
652                 signal.signal(signal.SIGINT, signal.SIG_DFL)
653                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
654                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
655             except Exception as e:
656                 self.logger.exception(
657                     'arv-mount: exception during exec %s', self.args.exec_args)
658                 try:
659                     rc = e.errno
660                 except AttributeError:
661                     pass
662         exit(rc)
663
664     def _run_standalone(self):
665         try:
666             self.daemon = not self.args.foreground
667             with self:
668                 self.llfuse_thread.join(timeout=None)
669         except Exception as e:
670             self.logger.exception('arv-mount: exception during mount: %s', e)
671             exit(getattr(e, 'errno', 1))
672         exit(0)
673
674     def _llfuse_main(self):
675         try:
676             llfuse.main()
677         except:
678             llfuse.close(unmount=False)
679             raise
680         llfuse.close()