Merge branch '15317-metrics'
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description="Interact with Arvados data through a local filesystem",
32         )
33         self.add_argument(
34             '--version',
35             action='version',
36             version=u"%s %s" % (sys.argv[0], __version__),
37             help="Print version and exit",
38         )
39         self.add_argument(
40             'mountpoint',
41             metavar='MOUNT_DIR',
42             help="Directory path to mount data",
43         )
44
45         mode_group = self.add_argument_group("Mount contents")
46         mode = mode_group.add_mutually_exclusive_group()
47         mode.add_argument(
48             '--all',
49             action='store_const',
50             const='all',
51             dest='mode',
52             help="""
53 Mount a subdirectory for each mode: `home`, `shared`, `by_id`, and `by_tag`
54 (default if no `--mount-*` options are given)
55 """,
56         )
57         mode.add_argument(
58             '--custom',
59             action='store_const',
60             const=None,
61             dest='mode',
62             help="""
63 Mount a subdirectory for each mode specified by a `--mount-*` option
64 (default if any `--mount-*` options are given;
65 see "Mount custom layout and filtering" section)
66 """,
67         )
68         mode.add_argument(
69             '--collection',
70             metavar='UUID_OR_PDH',
71             help="Mount the specified collection",
72         )
73         mode.add_argument(
74             '--home',
75             action='store_const',
76             const='home',
77             dest='mode',
78             help="Mount your home project",
79         )
80         mode.add_argument(
81             '--project',
82             metavar='UUID',
83             help="Mount the specified project",
84         )
85         mode.add_argument(
86             '--shared',
87             action='store_const',
88             const='shared',
89             dest='mode',
90             help="Mount a subdirectory for each project shared with you",
91         )
92         mode.add_argument(
93             '--by-id',
94             action='store_const',
95             const='by_id',
96             dest='mode',
97             help="""
98 Mount a magic directory where collections and projects are accessible through
99 subdirectories named after their UUID or portable data hash
100 """,
101         )
102         mode.add_argument(
103             '--by-pdh',
104             action='store_const',
105             const='by_pdh',
106             dest='mode',
107             help="""
108 Mount a magic directory where collections are accessible through
109 subdirectories named after their portable data hash
110 """,
111         )
112         mode.add_argument(
113             '--by-tag',
114             action='store_const',
115             const='by_tag',
116             dest='mode',
117             help="Mount a subdirectory for each tag attached to a collection or project",
118         )
119
120         mounts = self.add_argument_group("Mount custom layout and filtering")
121         mounts.add_argument(
122             '--filters',
123             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
124             help="""
125 Filters to apply to all project, shared, and tag directory contents.
126 Pass filters as either a JSON string or a path to a JSON file.
127 The JSON object should be a list of filters in Arvados API list filter syntax.
128 """,
129         )
130         mounts.add_argument(
131             '--mount-home',
132             metavar='PATH',
133             action='append',
134             default=[],
135             help="Make your home project available under the mount at `PATH`",
136         )
137         mounts.add_argument(
138             '--mount-shared',
139             metavar='PATH',
140             action='append',
141             default=[],
142             help="Make projects shared with you available under the mount at `PATH`",
143         )
144         mounts.add_argument(
145             '--mount-tmp',
146             metavar='PATH',
147             action='append',
148             default=[],
149             help="""
150 Make a new temporary writable collection available under the mount at `PATH`.
151 This collection is deleted when the mount is unmounted.
152 """,
153         )
154         mounts.add_argument(
155             '--mount-by-id',
156             metavar='PATH',
157             action='append',
158             default=[],
159             help="""
160 Make a magic directory available under the mount at `PATH` where collections and
161 projects are accessible through subdirectories named after their UUID or
162 portable data hash
163 """,
164         )
165         mounts.add_argument(
166             '--mount-by-pdh',
167             metavar='PATH',
168             action='append',
169             default=[],
170             help="""
171 Make a magic directory available under the mount at `PATH` where collections
172 are accessible through subdirectories named after portable data hash
173 """,
174         )
175         mounts.add_argument(
176             '--mount-by-tag',
177             metavar='PATH',
178             action='append',
179             default=[],
180             help="""
181 Make a subdirectory for each tag attached to a collection or project available
182 under the mount at `PATH`
183 """ ,
184         )
185
186         perms = self.add_argument_group("Mount access and permissions")
187         perms.add_argument(
188             '--allow-other',
189             action='store_true',
190             help="Let other users on this system read mounted data (default false)",
191         )
192         perms.add_argument(
193             '--read-only',
194             action='store_false',
195             default=False,
196             dest='enable_write',
197             help="Mounted data cannot be modified from the mount (default)",
198         )
199         perms.add_argument(
200             '--read-write',
201             action='store_true',
202             default=False,
203             dest='enable_write',
204             help="Mounted data can be modified from the mount",
205         )
206
207         lifecycle = self.add_argument_group("Mount lifecycle management")
208         lifecycle.add_argument(
209             '--exec',
210             nargs=argparse.REMAINDER,
211             dest="exec_args",
212             help="""
213 Mount data, run the specified command, then unmount and exit.
214 `--exec` reads all remaining options as the command to run,
215 so it must be the last option you specify.
216 Either end your command arguments (and other options) with a `--` argument,
217 or specify `--exec` after your mount point.
218 """,
219         )
220         lifecycle.add_argument(
221             '--foreground',
222             action='store_true',
223             default=False,
224             help="Run mount process in the foreground instead of daemonizing (default false)",
225         )
226         lifecycle.add_argument(
227             '--subtype',
228             help="Set mounted filesystem type to `fuse.SUBTYPE` (default is just `fuse`)",
229         )
230         unmount = lifecycle.add_mutually_exclusive_group()
231         unmount.add_argument(
232             '--replace',
233             action='store_true',
234             default=False,
235             help="""
236 If a FUSE mount is already mounted at the given directory,
237 unmount it before mounting the requested data.
238 If `--subtype` is specified, unmount only if the mount has that subtype.
239 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
240 """,
241         )
242         unmount.add_argument(
243             '--unmount',
244             action='store_true',
245             default=False,
246             help="""
247 If a FUSE mount is already mounted at the given directory, unmount it and exit.
248 If `--subtype` is specified, unmount only if the mount has that subtype.
249 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
250 """,
251         )
252         unmount.add_argument(
253             '--unmount-all',
254             action='store_true',
255             default=False,
256             help="""
257 Unmount all FUSE mounts at or below the given directory, then exit.
258 If `--subtype` is specified, unmount only if the mount has that subtype.
259 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
260 """,
261         )
262         lifecycle.add_argument(
263             '--unmount-timeout',
264             type=float,
265             default=2.0,
266             metavar='SECONDS',
267             help="""
268 The number of seconds to wait for a clean unmount after an `--exec` command has
269 exited (default %(default).01f).
270 After this time, the mount will be forcefully unmounted.
271 """,
272         )
273
274         reporting = self.add_argument_group("Mount logging and statistics")
275         reporting.add_argument(
276             '--crunchstat-interval',
277             type=float,
278             default=0.0,
279             metavar='SECONDS',
280             help="Write stats to stderr every N seconds (default disabled)",
281         )
282         reporting.add_argument(
283             '--debug',
284             action='store_true',
285             help="Log debug information",
286         )
287         reporting.add_argument(
288             '--logfile',
289             help="Write debug logs and errors to the specified file (default stderr)",
290         )
291
292         cache = self.add_argument_group("Mount local cache setup")
293         cachetype = cache.add_mutually_exclusive_group()
294         cachetype.add_argument(
295             '--disk-cache',
296             action='store_true',
297             default=True,
298             dest='disk_cache',
299             help="Cache data on the local filesystem (default)",
300         )
301         cachetype.add_argument(
302             '--ram-cache',
303             action='store_false',
304             default=True,
305             dest='disk_cache',
306             help="Cache data in memory",
307         )
308         cache.add_argument(
309             '--disk-cache-dir',
310             metavar="DIRECTORY",
311             help="Filesystem cache location (default `~/.cache/arvados/keep`)",
312         )
313         cache.add_argument(
314             '--directory-cache',
315             type=int,
316             default=128*1024*1024,
317             metavar='BYTES',
318             help="Size of directory data cache in bytes (default 128 MiB)",
319         )
320         cache.add_argument(
321             '--file-cache',
322             type=int,
323             default=0,
324             metavar='BYTES',
325             help="""
326 Size of file data cache in bytes
327 (default 8 GiB for filesystem cache, 256 MiB for memory cache)
328 """,
329         )
330
331         plumbing = self.add_argument_group("Mount interactions with Arvados and Linux")
332         plumbing.add_argument(
333             '--disable-event-listening',
334             action='store_true',
335             dest='disable_event_listening',
336             default=False,
337             help="Don't subscribe to events on the API server to update mount contents",
338         )
339         plumbing.add_argument(
340             '--encoding',
341             default="utf-8",
342             help="""
343 Filesystem character encoding
344 (default %(default)r; specify a name from the Python codec registry)
345 """,
346         )
347         plumbing.add_argument(
348             '--storage-classes',
349             metavar='CLASSES',
350             help="Comma-separated list of storage classes to request for new collections",
351         )
352
353
354 class Mount(object):
355     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
356         self.daemon = False
357         self.logger = logger
358         self.args = args
359         self.listen_for_events = False
360
361         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
362         if self.args.logfile:
363             self.args.logfile = os.path.realpath(self.args.logfile)
364
365         try:
366             self._setup_logging()
367         except Exception as e:
368             self.logger.exception("exception during setup: %s", e)
369             exit(1)
370
371         try:
372             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
373
374             minlimit = 10240
375             if self.args.file_cache:
376                 # Adjust the file handle limit so it can meet
377                 # the desired cache size. Multiply by 8 because the
378                 # number of 64 MiB cache slots that keepclient
379                 # allocates is RLIMIT_NOFILE / 8
380                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
381
382             if nofile_limit[0] < minlimit:
383                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
384
385             if minlimit > nofile_limit[1]:
386                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
387
388         except Exception as e:
389             self.logger.warning("unable to adjust file handle limit: %s", e)
390
391         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
392         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
393
394         try:
395             self._setup_api()
396             self._setup_mount()
397         except Exception as e:
398             self.logger.exception("exception during setup: %s", e)
399             exit(1)
400
401     def __enter__(self):
402         if self.args.replace:
403             unmount(path=self.args.mountpoint,
404                     timeout=self.args.unmount_timeout)
405         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
406         if self.daemon:
407             daemon.DaemonContext(
408                 working_directory=os.path.dirname(self.args.mountpoint),
409                 files_preserve=list(range(
410                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
411             ).open()
412         if self.listen_for_events and not self.args.disable_event_listening:
413             self.operations.listen_for_events()
414         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
415         self.llfuse_thread.daemon = True
416         self.llfuse_thread.start()
417         self.operations.initlock.wait()
418         return self
419
420     def __exit__(self, exc_type, exc_value, traceback):
421         if self.operations.events:
422             self.operations.events.close(timeout=self.args.unmount_timeout)
423         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
424         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
425         if self.llfuse_thread.is_alive():
426             self.logger.warning("Mount.__exit__:"
427                                 " llfuse thread still alive %fs after umount"
428                                 " -- abandoning and exiting anyway",
429                                 self.args.unmount_timeout)
430
431     def run(self):
432         if self.args.unmount or self.args.unmount_all:
433             unmount(path=self.args.mountpoint,
434                     subtype=self.args.subtype,
435                     timeout=self.args.unmount_timeout,
436                     recursive=self.args.unmount_all)
437         elif self.args.exec_args:
438             self._run_exec()
439         else:
440             self._run_standalone()
441
442     def _fuse_options(self):
443         """FUSE mount options; see mount.fuse(8)"""
444         opts = [optname for optname in ['allow_other', 'debug']
445                 if getattr(self.args, optname)]
446         # Increase default read/write size from 4KiB to 128KiB
447         opts += ["big_writes", "max_read=131072"]
448         if self.args.subtype:
449             opts += ["subtype="+self.args.subtype]
450         return opts
451
452     def _setup_logging(self):
453         # Configure a log handler based on command-line switches.
454         if self.args.logfile:
455             log_handler = logging.FileHandler(self.args.logfile)
456             log_handler.setFormatter(logging.Formatter(
457                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
458                 '%Y-%m-%d %H:%M:%S'))
459         else:
460             log_handler = None
461
462         if log_handler is not None:
463             arvados.logger.removeHandler(arvados.log_handler)
464             arvados.logger.addHandler(log_handler)
465
466         if self.args.debug:
467             arvados.logger.setLevel(logging.DEBUG)
468             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
469             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
470             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
471             self.logger.debug("arv-mount debugging enabled")
472
473         self.logger.info("%s %s started", sys.argv[0], __version__)
474         self.logger.info("enable write is %s", self.args.enable_write)
475
476     def _setup_api(self):
477         try:
478             # default value of file_cache is 0, this tells KeepBlockCache to
479             # choose a default based on whether disk_cache is enabled or not.
480
481             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
482                                                       disk_cache=self.args.disk_cache,
483                                                       disk_cache_dir=self.args.disk_cache_dir)
484
485             # If there's too many prefetch threads and you
486             # max out the CPU, delivering data to the FUSE
487             # layer actually ends up being slower.
488             # Experimentally, capping 7 threads seems to
489             # be a sweet spot.
490             prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
491
492             self.api = arvados.safeapi.ThreadSafeApiCache(
493                 apiconfig=arvados.config.settings(),
494                 api_params={
495                     'num_retries': self.args.retries,
496                 },
497                 keep_params={
498                     'block_cache': block_cache,
499                     'num_prefetch_threads': prefetch_threads,
500                     'num_retries': self.args.retries,
501                 },
502                 version='v1',
503             )
504         except KeyError as e:
505             self.logger.error("Missing environment: %s", e)
506             exit(1)
507         # Do a sanity check that we have a working arvados host + token.
508         self.api.users().current().execute()
509
510     def _setup_mount(self):
511         self.operations = Operations(
512             os.getuid(),
513             os.getgid(),
514             api_client=self.api,
515             encoding=self.args.encoding,
516             inode_cache=InodeCache(cap=self.args.directory_cache),
517             enable_write=self.args.enable_write)
518
519         if self.args.crunchstat_interval:
520             statsthread = threading.Thread(
521                 target=crunchstat.statlogger,
522                 args=(self.args.crunchstat_interval,
523                       self.api.keep,
524                       self.operations))
525             statsthread.daemon = True
526             statsthread.start()
527
528         usr = self.api.users().current().execute(num_retries=self.args.retries)
529         now = time.time()
530         dir_class = None
531         dir_args = [
532             llfuse.ROOT_INODE,
533             self.operations.inodes,
534             self.api,
535             self.args.retries,
536             self.args.enable_write,
537             self.args.filters,
538         ]
539         mount_readme = False
540
541         storage_classes = None
542         if self.args.storage_classes is not None:
543             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
544             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
545
546         if self.args.collection is not None:
547             # Set up the request handler with the collection at the root
548             # First check that the collection is readable
549             self.api.collections().get(uuid=self.args.collection).execute()
550             self.args.mode = 'collection'
551             dir_class = CollectionDirectory
552             dir_args.append(self.args.collection)
553         elif self.args.project is not None:
554             self.args.mode = 'project'
555             dir_class = ProjectDirectory
556             dir_args.append(
557                 self.api.groups().get(uuid=self.args.project).execute(
558                     num_retries=self.args.retries))
559
560         if (self.args.mount_by_id or
561             self.args.mount_by_pdh or
562             self.args.mount_by_tag or
563             self.args.mount_home or
564             self.args.mount_shared or
565             self.args.mount_tmp):
566             if self.args.mode is not None:
567                 sys.exit(
568                     "Cannot combine '{}' mode with custom --mount-* options.".
569                     format(self.args.mode))
570         elif self.args.mode is None:
571             # If no --mount-custom or custom mount args, --all is the default
572             self.args.mode = 'all'
573
574         if self.args.mode in ['by_id', 'by_pdh']:
575             # Set up the request handler with the 'magic directory' at the root
576             dir_class = MagicDirectory
577             dir_args.append(self.args.mode == 'by_pdh')
578         elif self.args.mode == 'by_tag':
579             dir_class = TagsDirectory
580         elif self.args.mode == 'shared':
581             dir_class = SharedDirectory
582             dir_args.append(usr)
583         elif self.args.mode == 'home':
584             dir_class = ProjectDirectory
585             dir_args.append(usr)
586             dir_args.append(True)
587         elif self.args.mode == 'all':
588             self.args.mount_by_id = ['by_id']
589             self.args.mount_by_tag = ['by_tag']
590             self.args.mount_home = ['home']
591             self.args.mount_shared = ['shared']
592             mount_readme = True
593
594         if dir_class is not None:
595             if dir_class in [TagsDirectory, CollectionDirectory]:
596                 ent = dir_class(*dir_args)
597             else:
598                 ent = dir_class(*dir_args, storage_classes=storage_classes)
599             self.operations.inodes.add_entry(ent)
600             self.listen_for_events = ent.want_event_subscribe()
601             return
602
603         e = self.operations.inodes.add_entry(Directory(
604             llfuse.ROOT_INODE,
605             self.operations.inodes,
606             self.api.config,
607             self.args.enable_write,
608             self.args.filters,
609         ))
610         dir_args[0] = e.inode
611
612         for name in self.args.mount_by_id:
613             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
614         for name in self.args.mount_by_pdh:
615             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
616         for name in self.args.mount_by_tag:
617             self._add_mount(e, name, TagsDirectory(*dir_args))
618         for name in self.args.mount_home:
619             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
620         for name in self.args.mount_shared:
621             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
622         for name in self.args.mount_tmp:
623             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
624
625         if mount_readme:
626             text = self._readme_text(
627                 arvados.config.get('ARVADOS_API_HOST'),
628                 usr['email'])
629             self._add_mount(e, 'README', StringFile(e.inode, text, now))
630
631     def _add_mount(self, tld, name, ent):
632         if name in ['', '.', '..'] or '/' in name:
633             sys.exit("Mount point '{}' is not supported.".format(name))
634         tld._entries[name] = self.operations.inodes.add_entry(ent)
635         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
636
637     def _readme_text(self, api_host, user_email):
638         return '''
639 Welcome to Arvados!  This directory provides file system access to
640 files and objects available on the Arvados installation located at
641 '{}' using credentials for user '{}'.
642
643 From here, the following directories are available:
644
645   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
646   by_tag/    Access to Keep collections organized by tag.
647   home/      The contents of your home project.
648   shared/    Projects shared with you.
649
650 '''.format(api_host, user_email)
651
652     def _run_exec(self):
653         rc = 255
654         with self:
655             try:
656                 sp = subprocess.Popen(self.args.exec_args, shell=False)
657
658                 # forward signals to the process.
659                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
660                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
661                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
662
663                 # wait for process to complete.
664                 rc = sp.wait()
665
666                 # restore default signal handlers.
667                 signal.signal(signal.SIGINT, signal.SIG_DFL)
668                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
669                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
670             except Exception as e:
671                 self.logger.exception(
672                     'arv-mount: exception during exec %s', self.args.exec_args)
673                 try:
674                     rc = e.errno
675                 except AttributeError:
676                     pass
677         exit(rc)
678
679     def _run_standalone(self):
680         try:
681             self.daemon = not self.args.foreground
682             with self:
683                 self.llfuse_thread.join(timeout=None)
684         except Exception as e:
685             self.logger.exception('arv-mount: exception during mount: %s', e)
686             exit(getattr(e, 'errno', 1))
687         exit(0)
688
689     def _llfuse_main(self):
690         try:
691             llfuse.main()
692         except:
693             llfuse.close(unmount=False)
694             raise
695         llfuse.close()