21504: Style arv-mount argument creation consistently
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description="""
32 Mount Keep data under the local filesystem.  Default mode is --home
33 """,
34             epilog="""
35     Note: When using the --exec feature, you must either specify the
36     mountpoint before --exec, or mark the end of your --exec arguments
37     with "--".
38             """)
39         self.add_argument(
40             '--version',
41             action='version',
42             version=u"%s %s" % (sys.argv[0], __version__),
43             help='Print version and exit.',
44         )
45         self.add_argument(
46             'mountpoint',
47             help="""Mount point.""",
48         )
49         self.add_argument(
50             '--allow-other',
51             action='store_true',
52             help="""Let other users read the mount""",
53         )
54         self.add_argument(
55             '--subtype',
56             metavar='STRING',
57             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""",
58         )
59
60         mode = self.add_mutually_exclusive_group()
61         mode.add_argument(
62             '--all',
63             action='store_const',
64             const='all',
65             dest='mode',
66             help="""
67 Mount a subdirectory for each mode: home, shared, by_tag, by_id
68 (default if no --mount-* arguments are given).
69 """,
70         )
71         mode.add_argument(
72             '--custom',
73             action='store_const',
74             const=None,
75             dest='mode',
76             help="""
77 Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments
78 (default if any --mount-* arguments are given).
79 """,
80         )
81         mode.add_argument(
82             '--home',
83             action='store_const',
84             const='home',
85             dest='mode',
86             help="""Mount only the user's home project.""",
87         )
88         mode.add_argument(
89             '--shared',
90             action='store_const',
91             const='shared',
92             dest='mode',
93             help="""Mount only list of projects shared with the user.""",
94         )
95         mode.add_argument(
96             '--by-tag',
97             action='store_const',
98             const='by_tag',
99             dest='mode',
100             help="""Mount subdirectories listed by tag.""",
101         )
102         mode.add_argument(
103             '--by-id',
104             action='store_const',
105             const='by_id',
106             dest='mode',
107             help="""Mount subdirectories listed by portable data hash or uuid.""",
108         )
109         mode.add_argument(
110             '--by-pdh',
111             action='store_const',
112             const='by_pdh',
113             dest='mode',
114             help="""Mount subdirectories listed by portable data hash.""",
115         )
116         mode.add_argument(
117             '--project',
118             metavar='UUID',
119             help="""Mount the specified project.""",
120         )
121         mode.add_argument(
122             '--collection',
123             metavar='UUID_or_PDH',
124             help="""Mount only the specified collection.""",
125         )
126
127         mounts = self.add_argument_group('Custom mount options')
128         mounts.add_argument(
129             '--mount-by-pdh',
130             metavar='PATH',
131             action='append',
132             default=[],
133             help="""
134 Mount each readable collection at mountpoint/PATH/P
135 where P is the collection's portable data hash.
136 """,
137         )
138         mounts.add_argument(
139             '--mount-by-id',
140             metavar='PATH',
141             action='append',
142             default=[],
143             help="""
144 Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH
145 where PDH is the collection's portable data hash and UUID is its UUID.
146 """,
147         )
148         mounts.add_argument(
149             '--mount-by-tag',
150             metavar='PATH',
151             action='append',
152             default=[],
153             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.",
154         )
155         mounts.add_argument(
156             '--mount-home',
157             metavar='PATH',
158             action='append',
159             default=[],
160             help="Mount the current user's home project at mountpoint/PATH.",
161         )
162         mounts.add_argument(
163             '--mount-shared',
164             metavar='PATH',
165             action='append',
166             default=[],
167             help="Mount projects shared with the current user at mountpoint/PATH.",
168         )
169         mounts.add_argument(
170             '--mount-tmp',
171             metavar='PATH',
172             action='append',
173             default=[],
174             help="""
175 Create a new collection, mount it in read/write mode at mountpoint/PATH,
176 and delete it when unmounting.
177 """,
178         )
179
180         self.add_argument(
181             '--debug',
182             action='store_true',
183             help="""Debug mode""",
184         )
185         self.add_argument(
186             '--logfile',
187             help="""Write debug logs and errors to the specified file (default stderr).""",
188         )
189         self.add_argument(
190             '--foreground',
191             action='store_true',
192             default=False,
193             help="""Run in foreground (default is to daemonize unless --exec specified)""",
194         )
195         self.add_argument(
196             '--encoding',
197             default="utf-8",
198             help="""
199 Character encoding to use for filesystem, default is utf-8
200 (see Python codec registry for list of available encodings)"
201 """,
202         )
203
204         self.add_argument(
205             '--file-cache',
206             type=int,
207             default=0,
208             help="""
209 File data cache size, in bytes
210 (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)
211 """,
212         )
213         self.add_argument(
214             '--directory-cache',
215             type=int,
216             default=128*1024*1024,
217             help="Directory data cache size, in bytes (default 128 MiB)",
218         )
219
220         cachetype = self.add_mutually_exclusive_group()
221         cachetype.add_argument(
222             '--ram-cache',
223             action='store_false',
224             default=True,
225             dest='disk_cache',
226             help="Use in-memory caching only",
227         )
228         cachetype.add_argument(
229             '--disk-cache',
230             action='store_true',
231             default=True,
232             dest='disk_cache',
233             help="Use disk based caching (default)",
234         )
235
236         self.add_argument(
237             '--disk-cache-dir',
238             help="Disk cache location (default ~/.cache/arvados/keep)",
239         )
240
241         self.add_argument(
242             '--disable-event-listening',
243             action='store_true',
244             dest='disable_event_listening',
245             default=False,
246             help="Don't subscribe to events on the API server",
247         )
248
249         self.add_argument(
250             '--read-only',
251             action='store_false',
252             default=False,
253             dest='enable_write',
254             help="Mount will be read only (default)",
255         )
256         self.add_argument(
257             '--read-write',
258             action='store_true',
259             default=False,
260             dest='enable_write',
261             help="Mount will be read-write",
262         )
263         self.add_argument(
264             '--storage-classes',
265             metavar='CLASSES',
266             help="Specify comma separated list of storage classes to be used when saving data of new collections",
267         )
268
269         self.add_argument(
270             '--crunchstat-interval',
271             type=float,
272             default=0.0,
273             help="Write stats to stderr every N seconds (default disabled)",
274         )
275
276         unmount = self.add_mutually_exclusive_group()
277         unmount.add_argument(
278             '--unmount',
279             action='store_true',
280             default=False,
281             help="""
282 Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit.
283 If --subtype is given, unmount only if the mount has the specified subtype.
284 WARNING: This command can affect any kind of fuse mount, not just arv-mount.
285 """,
286         )
287         unmount.add_argument(
288             '--unmount-all',
289             action='store_true',
290             default=False,
291             help="""
292 Forcefully unmount every fuse mount at or below the specified path and exit.
293 If --subtype is given, unmount only mounts that have the specified subtype.
294 Exit non-zero if any other types of mounts are found at or below the given path.
295 WARNING: This command can affect any kind of fuse mount, not just arv-mount.
296 """,
297         )
298         unmount.add_argument(
299             '--replace',
300             action='store_true',
301             default=False,
302             help="""
303 If a fuse mount is already present at mountpoint, forcefully unmount it before mounting
304 """,
305         )
306         self.add_argument(
307             '--unmount-timeout',
308             type=float,
309             default=2.0,
310             help="""
311 Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted
312 """,
313         )
314         self.add_argument(
315             '--filters',
316             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
317             help="""
318 Filters to apply to all project, shared, and tag directory contents.
319 Pass filters as either a JSON string or a path to a JSON file.
320 The JSON object should be a list of filters in Arvados API list filter syntax.
321 """,
322         )
323         self.add_argument(
324             '--exec',
325             nargs=argparse.REMAINDER,
326             dest="exec_args",
327             metavar=('command', 'args', '...', '--'),
328             help="""Mount, run a command, then unmount and exit""",
329         )
330
331
332 class Mount(object):
333     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
334         self.daemon = False
335         self.logger = logger
336         self.args = args
337         self.listen_for_events = False
338
339         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
340         if self.args.logfile:
341             self.args.logfile = os.path.realpath(self.args.logfile)
342
343         try:
344             self._setup_logging()
345         except Exception as e:
346             self.logger.exception("exception during setup: %s", e)
347             exit(1)
348
349         try:
350             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
351
352             minlimit = 10240
353             if self.args.file_cache:
354                 # Adjust the file handle limit so it can meet
355                 # the desired cache size. Multiply by 8 because the
356                 # number of 64 MiB cache slots that keepclient
357                 # allocates is RLIMIT_NOFILE / 8
358                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
359
360             if nofile_limit[0] < minlimit:
361                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
362
363             if minlimit > nofile_limit[1]:
364                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
365
366         except Exception as e:
367             self.logger.warning("unable to adjust file handle limit: %s", e)
368
369         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
370         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
371
372         try:
373             self._setup_api()
374             self._setup_mount()
375         except Exception as e:
376             self.logger.exception("exception during setup: %s", e)
377             exit(1)
378
379     def __enter__(self):
380         if self.args.replace:
381             unmount(path=self.args.mountpoint,
382                     timeout=self.args.unmount_timeout)
383         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
384         if self.daemon:
385             daemon.DaemonContext(
386                 working_directory=os.path.dirname(self.args.mountpoint),
387                 files_preserve=list(range(
388                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
389             ).open()
390         if self.listen_for_events and not self.args.disable_event_listening:
391             self.operations.listen_for_events()
392         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
393         self.llfuse_thread.daemon = True
394         self.llfuse_thread.start()
395         self.operations.initlock.wait()
396         return self
397
398     def __exit__(self, exc_type, exc_value, traceback):
399         if self.operations.events:
400             self.operations.events.close(timeout=self.args.unmount_timeout)
401         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
402         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
403         if self.llfuse_thread.is_alive():
404             self.logger.warning("Mount.__exit__:"
405                                 " llfuse thread still alive %fs after umount"
406                                 " -- abandoning and exiting anyway",
407                                 self.args.unmount_timeout)
408
409     def run(self):
410         if self.args.unmount or self.args.unmount_all:
411             unmount(path=self.args.mountpoint,
412                     subtype=self.args.subtype,
413                     timeout=self.args.unmount_timeout,
414                     recursive=self.args.unmount_all)
415         elif self.args.exec_args:
416             self._run_exec()
417         else:
418             self._run_standalone()
419
420     def _fuse_options(self):
421         """FUSE mount options; see mount.fuse(8)"""
422         opts = [optname for optname in ['allow_other', 'debug']
423                 if getattr(self.args, optname)]
424         # Increase default read/write size from 4KiB to 128KiB
425         opts += ["big_writes", "max_read=131072"]
426         if self.args.subtype:
427             opts += ["subtype="+self.args.subtype]
428         return opts
429
430     def _setup_logging(self):
431         # Configure a log handler based on command-line switches.
432         if self.args.logfile:
433             log_handler = logging.FileHandler(self.args.logfile)
434             log_handler.setFormatter(logging.Formatter(
435                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
436                 '%Y-%m-%d %H:%M:%S'))
437         else:
438             log_handler = None
439
440         if log_handler is not None:
441             arvados.logger.removeHandler(arvados.log_handler)
442             arvados.logger.addHandler(log_handler)
443
444         if self.args.debug:
445             arvados.logger.setLevel(logging.DEBUG)
446             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
447             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
448             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
449             self.logger.debug("arv-mount debugging enabled")
450
451         self.logger.info("%s %s started", sys.argv[0], __version__)
452         self.logger.info("enable write is %s", self.args.enable_write)
453
454     def _setup_api(self):
455         try:
456             # default value of file_cache is 0, this tells KeepBlockCache to
457             # choose a default based on whether disk_cache is enabled or not.
458
459             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
460                                                       disk_cache=self.args.disk_cache,
461                                                       disk_cache_dir=self.args.disk_cache_dir)
462
463             # If there's too many prefetch threads and you
464             # max out the CPU, delivering data to the FUSE
465             # layer actually ends up being slower.
466             # Experimentally, capping 7 threads seems to
467             # be a sweet spot.
468             prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
469
470             self.api = arvados.safeapi.ThreadSafeApiCache(
471                 apiconfig=arvados.config.settings(),
472                 api_params={
473                     'num_retries': self.args.retries,
474                 },
475                 keep_params={
476                     'block_cache': block_cache,
477                     'num_prefetch_threads': prefetch_threads,
478                     'num_retries': self.args.retries,
479                 },
480                 version='v1',
481             )
482         except KeyError as e:
483             self.logger.error("Missing environment: %s", e)
484             exit(1)
485         # Do a sanity check that we have a working arvados host + token.
486         self.api.users().current().execute()
487
488     def _setup_mount(self):
489         self.operations = Operations(
490             os.getuid(),
491             os.getgid(),
492             api_client=self.api,
493             encoding=self.args.encoding,
494             inode_cache=InodeCache(cap=self.args.directory_cache),
495             enable_write=self.args.enable_write)
496
497         if self.args.crunchstat_interval:
498             statsthread = threading.Thread(
499                 target=crunchstat.statlogger,
500                 args=(self.args.crunchstat_interval,
501                       self.api.keep,
502                       self.operations))
503             statsthread.daemon = True
504             statsthread.start()
505
506         usr = self.api.users().current().execute(num_retries=self.args.retries)
507         now = time.time()
508         dir_class = None
509         dir_args = [
510             llfuse.ROOT_INODE,
511             self.operations.inodes,
512             self.api,
513             self.args.retries,
514             self.args.enable_write,
515             self.args.filters,
516         ]
517         mount_readme = False
518
519         storage_classes = None
520         if self.args.storage_classes is not None:
521             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
522             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
523
524         if self.args.collection is not None:
525             # Set up the request handler with the collection at the root
526             # First check that the collection is readable
527             self.api.collections().get(uuid=self.args.collection).execute()
528             self.args.mode = 'collection'
529             dir_class = CollectionDirectory
530             dir_args.append(self.args.collection)
531         elif self.args.project is not None:
532             self.args.mode = 'project'
533             dir_class = ProjectDirectory
534             dir_args.append(
535                 self.api.groups().get(uuid=self.args.project).execute(
536                     num_retries=self.args.retries))
537
538         if (self.args.mount_by_id or
539             self.args.mount_by_pdh or
540             self.args.mount_by_tag or
541             self.args.mount_home or
542             self.args.mount_shared or
543             self.args.mount_tmp):
544             if self.args.mode is not None:
545                 sys.exit(
546                     "Cannot combine '{}' mode with custom --mount-* options.".
547                     format(self.args.mode))
548         elif self.args.mode is None:
549             # If no --mount-custom or custom mount args, --all is the default
550             self.args.mode = 'all'
551
552         if self.args.mode in ['by_id', 'by_pdh']:
553             # Set up the request handler with the 'magic directory' at the root
554             dir_class = MagicDirectory
555             dir_args.append(self.args.mode == 'by_pdh')
556         elif self.args.mode == 'by_tag':
557             dir_class = TagsDirectory
558         elif self.args.mode == 'shared':
559             dir_class = SharedDirectory
560             dir_args.append(usr)
561         elif self.args.mode == 'home':
562             dir_class = ProjectDirectory
563             dir_args.append(usr)
564             dir_args.append(True)
565         elif self.args.mode == 'all':
566             self.args.mount_by_id = ['by_id']
567             self.args.mount_by_tag = ['by_tag']
568             self.args.mount_home = ['home']
569             self.args.mount_shared = ['shared']
570             mount_readme = True
571
572         if dir_class is not None:
573             if dir_class in [TagsDirectory, CollectionDirectory]:
574                 ent = dir_class(*dir_args)
575             else:
576                 ent = dir_class(*dir_args, storage_classes=storage_classes)
577             self.operations.inodes.add_entry(ent)
578             self.listen_for_events = ent.want_event_subscribe()
579             return
580
581         e = self.operations.inodes.add_entry(Directory(
582             llfuse.ROOT_INODE,
583             self.operations.inodes,
584             self.api.config,
585             self.args.enable_write,
586             self.args.filters,
587         ))
588         dir_args[0] = e.inode
589
590         for name in self.args.mount_by_id:
591             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
592         for name in self.args.mount_by_pdh:
593             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
594         for name in self.args.mount_by_tag:
595             self._add_mount(e, name, TagsDirectory(*dir_args))
596         for name in self.args.mount_home:
597             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
598         for name in self.args.mount_shared:
599             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
600         for name in self.args.mount_tmp:
601             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
602
603         if mount_readme:
604             text = self._readme_text(
605                 arvados.config.get('ARVADOS_API_HOST'),
606                 usr['email'])
607             self._add_mount(e, 'README', StringFile(e.inode, text, now))
608
609     def _add_mount(self, tld, name, ent):
610         if name in ['', '.', '..'] or '/' in name:
611             sys.exit("Mount point '{}' is not supported.".format(name))
612         tld._entries[name] = self.operations.inodes.add_entry(ent)
613         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
614
615     def _readme_text(self, api_host, user_email):
616         return '''
617 Welcome to Arvados!  This directory provides file system access to
618 files and objects available on the Arvados installation located at
619 '{}' using credentials for user '{}'.
620
621 From here, the following directories are available:
622
623   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
624   by_tag/    Access to Keep collections organized by tag.
625   home/      The contents of your home project.
626   shared/    Projects shared with you.
627
628 '''.format(api_host, user_email)
629
630     def _run_exec(self):
631         rc = 255
632         with self:
633             try:
634                 sp = subprocess.Popen(self.args.exec_args, shell=False)
635
636                 # forward signals to the process.
637                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
638                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
639                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
640
641                 # wait for process to complete.
642                 rc = sp.wait()
643
644                 # restore default signal handlers.
645                 signal.signal(signal.SIGINT, signal.SIG_DFL)
646                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
647                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
648             except Exception as e:
649                 self.logger.exception(
650                     'arv-mount: exception during exec %s', self.args.exec_args)
651                 try:
652                     rc = e.errno
653                 except AttributeError:
654                     pass
655         exit(rc)
656
657     def _run_standalone(self):
658         try:
659             self.daemon = not self.args.foreground
660             with self:
661                 self.llfuse_thread.join(timeout=None)
662         except Exception as e:
663             self.logger.exception('arv-mount: exception during mount: %s', e)
664             exit(getattr(e, 'errno', 1))
665         exit(0)
666
667     def _llfuse_main(self):
668         try:
669             llfuse.main()
670         except:
671             llfuse.close(unmount=False)
672             raise
673         llfuse.close()