29ace2e52e6f82b1b784049cd8fc05baeac75575
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description="Interact with Arvados data through a local filesystem",
32         )
33         self.add_argument(
34             '--version',
35             action='version',
36             version=u"%s %s" % (sys.argv[0], __version__),
37             help="Print version and exit",
38         )
39         self.add_argument(
40             'mountpoint',
41             metavar='MOUNT_DIR',
42             help="Directory path to mount data",
43         )
44
45         mode_group = self.add_argument_group("Mount contents")
46         mode = mode_group.add_mutually_exclusive_group()
47         mode.add_argument(
48             '--all',
49             action='store_const',
50             const='all',
51             dest='mode',
52             help="""
53 Mount a subdirectory for each mode: `home`, `shared`, `by_id`, and `by_tag`
54 (default if no `--mount-*` options are given)
55 """,
56         )
57         mode.add_argument(
58             '--custom',
59             action='store_const',
60             const=None,
61             dest='mode',
62             help="""
63 Mount a subdirectory for each mode specified by a `--mount-*` option
64 (default if any `--mount-*` options are given;
65 see "Mount custom layout and filtering" section)
66 """,
67         )
68         mode.add_argument(
69             '--collection',
70             metavar='UUID_OR_PDH',
71             help="Mount the specified collection",
72         )
73         mode.add_argument(
74             '--home',
75             action='store_const',
76             const='home',
77             dest='mode',
78             help="Mount your home project",
79         )
80         mode.add_argument(
81             '--project',
82             metavar='UUID',
83             help="Mount the specified project",
84         )
85         mode.add_argument(
86             '--shared',
87             action='store_const',
88             const='shared',
89             dest='mode',
90             help="Mount a subdirectory for each project shared with you",
91         )
92         mode.add_argument(
93             '--by-id',
94             action='store_const',
95             const='by_id',
96             dest='mode',
97             help="""
98 Mount a magic directory where collections and projects are accessible through
99 subdirectories named after their UUID or portable data hash
100 """,
101         )
102         mode.add_argument(
103             '--by-pdh',
104             action='store_const',
105             const='by_pdh',
106             dest='mode',
107             help="""
108 Mount a magic directory where collections are accessible through
109 subdirectories named after their portable data hash
110 """,
111         )
112         mode.add_argument(
113             '--by-tag',
114             action='store_const',
115             const='by_tag',
116             dest='mode',
117             help="Mount a subdirectory for each tag attached to a collection or project",
118         )
119
120         mounts = self.add_argument_group("Mount custom layout and filtering")
121         mounts.add_argument(
122             '--filters',
123             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
124             help="""
125 Filters to apply to all project, shared, and tag directory contents.
126 Pass filters as either a JSON string or a path to a JSON file.
127 The JSON object should be a list of filters in Arvados API list filter syntax.
128 """,
129         )
130         mounts.add_argument(
131             '--mount-home',
132             metavar='PATH',
133             action='append',
134             default=[],
135             help="Make your home project available under the mount at `PATH`",
136         )
137         mounts.add_argument(
138             '--mount-shared',
139             metavar='PATH',
140             action='append',
141             default=[],
142             help="Make projects shared with you available under the mount at `PATH`",
143         )
144         mounts.add_argument(
145             '--mount-tmp',
146             metavar='PATH',
147             action='append',
148             default=[],
149             help="""
150 Make a new temporary writable collection available under the mount at `PATH`.
151 This collection is deleted when the mount is unmounted.
152 """,
153         )
154         mounts.add_argument(
155             '--mount-by-id',
156             metavar='PATH',
157             action='append',
158             default=[],
159             help="""
160 Make a magic directory available under the mount at `PATH` where collections and
161 projects are accessible through subdirectories named after their UUID or
162 portable data hash
163 """,
164         )
165         mounts.add_argument(
166             '--mount-by-pdh',
167             metavar='PATH',
168             action='append',
169             default=[],
170             help="""
171 Make a magic directory available under the mount at `PATH` where collections
172 are accessible through subdirectories named after portable data hash
173 """,
174         )
175         mounts.add_argument(
176             '--mount-by-tag',
177             metavar='PATH',
178             action='append',
179             default=[],
180             help="""
181 Make a subdirectory for each tag attached to a collection or project available
182 under the mount at `PATH`
183 """ ,
184         )
185
186         perms = self.add_argument_group("Mount access and permissions")
187         perms.add_argument(
188             '--allow-other',
189             action='store_true',
190             help="Let other users on this system read mounted data (default false)",
191         )
192         perms.add_argument(
193             '--read-only',
194             action='store_false',
195             default=False,
196             dest='enable_write',
197             help="Mounted data cannot be modified from the mount (default)",
198         )
199         perms.add_argument(
200             '--read-write',
201             action='store_true',
202             default=False,
203             dest='enable_write',
204             help="Mounted data can be modified from the mount",
205         )
206
207         lifecycle = self.add_argument_group("Mount lifecycle management")
208         lifecycle.add_argument(
209             '--exec',
210             nargs=argparse.REMAINDER,
211             dest="exec_args",
212             help="""
213 Mount data, run the specified command, then unmount and exit.
214 `--exec` reads all remaining options as the command to run,
215 so it must be the last option you specify.
216 Either end your command arguments (and other options) with a `--` argument,
217 or specify `--exec` after your mount point.
218 """,
219         )
220         lifecycle.add_argument(
221             '--foreground',
222             action='store_true',
223             default=False,
224             help="Run mount process in the foreground instead of daemonizing (default false)",
225         )
226         lifecycle.add_argument(
227             '--subtype',
228             help="Set mounted filesystem type to `fuse.SUBTYPE` (default is just `fuse`)",
229         )
230         unmount = lifecycle.add_mutually_exclusive_group()
231         unmount.add_argument(
232             '--replace',
233             action='store_true',
234             default=False,
235             help="""
236 If a FUSE mount is already mounted at the given directory,
237 unmount it before mounting the requested data.
238 If `--subtype` is specified, unmount only if the mount has that subtype.
239 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
240 """,
241         )
242         unmount.add_argument(
243             '--unmount',
244             action='store_true',
245             default=False,
246             help="""
247 If a FUSE mount is already mounted at the given directory, unmount it and exit.
248 If `--subtype` is specified, unmount only if the mount has that subtype.
249 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
250 """,
251         )
252         unmount.add_argument(
253             '--unmount-all',
254             action='store_true',
255             default=False,
256             help="""
257 Unmount all FUSE mounts at or below the given directory, then exit.
258 If `--subtype` is specified, unmount only if the mount has that subtype.
259 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
260 """,
261         )
262         lifecycle.add_argument(
263             '--unmount-timeout',
264             type=float,
265             default=2.0,
266             metavar='SECONDS',
267             help="""
268 The number of seconds to wait for a clean unmount after an `--exec` command has
269 exited (default %(default).01f).
270 After this time, the mount will be forcefully unmounted.
271 """,
272         )
273
274         reporting = self.add_argument_group("Mount logging and statistics")
275         reporting.add_argument(
276             '--crunchstat-interval',
277             type=float,
278             default=0.0,
279             metavar='SECONDS',
280             help="Write stats to stderr every N seconds (default disabled)",
281         )
282         reporting.add_argument(
283             '--debug',
284             action='store_true',
285             help="Log debug information",
286         )
287         reporting.add_argument(
288             '--logfile',
289             help="Write debug logs and errors to the specified file (default stderr)",
290         )
291
292         cache = self.add_argument_group("Mount local cache setup")
293         cachetype = cache.add_mutually_exclusive_group()
294         cachetype.add_argument(
295             '--disk-cache',
296             action='store_true',
297             default=True,
298             dest='disk_cache',
299             help="Cache data on the local filesystem (default)",
300         )
301         cachetype.add_argument(
302             '--ram-cache',
303             action='store_false',
304             default=True,
305             dest='disk_cache',
306             help="Cache data in memory",
307         )
308         cache.add_argument(
309             '--disk-cache-dir',
310             metavar="DIRECTORY",
311             help="Filesystem cache location (default `~/.cache/arvados/keep`)",
312         )
313         cache.add_argument(
314             '--directory-cache',
315             type=int,
316             default=128*1024*1024,
317             metavar='BYTES',
318             help="Size of directory data cache in bytes (default 128 MiB)",
319         )
320         cache.add_argument(
321             '--file-cache',
322             type=int,
323             default=0,
324             metavar='BYTES',
325             help="""
326 Size of file data cache in bytes
327 (default 8 GiB for filesystem cache, 256 MiB for memory cache)
328 """,
329         )
330
331         plumbing = self.add_argument_group("Mount interactions with Arvados and Linux")
332         plumbing.add_argument(
333             '--disable-event-listening',
334             action='store_true',
335             dest='disable_event_listening',
336             default=False,
337             help="Don't subscribe to events on the API server to update mount contents",
338         )
339         plumbing.add_argument(
340             '--encoding',
341             default="utf-8",
342             help="""
343 Filesystem character encoding
344 (default %(default)r; specify a name from the Python codec registry)
345 """,
346         )
347         plumbing.add_argument(
348             '--storage-classes',
349             metavar='CLASSES',
350             help="Comma-separated list of storage classes to request for new collections",
351         )
352
353
354 class Mount(object):
355     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
356         self.daemon = False
357         self.logger = logger
358         self.args = args
359         self.listen_for_events = False
360
361         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
362         if self.args.logfile:
363             self.args.logfile = os.path.realpath(self.args.logfile)
364
365         try:
366             self._setup_logging()
367         except Exception as e:
368             self.logger.exception("exception during setup: %s", e)
369             exit(1)
370
371         try:
372             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
373
374             minlimit = 10240
375             if self.args.file_cache:
376                 # Adjust the file handle limit so it can meet
377                 # the desired cache size. Multiply by 8 because the
378                 # number of 64 MiB cache slots that keepclient
379                 # allocates is RLIMIT_NOFILE / 8
380                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
381
382             if nofile_limit[0] < minlimit:
383                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
384
385             if minlimit > nofile_limit[1]:
386                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
387
388         except Exception as e:
389             self.logger.warning("unable to adjust file handle limit: %s", e)
390
391         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
392         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
393
394         try:
395             self._setup_api()
396             self._setup_mount()
397         except Exception as e:
398             self.logger.exception("exception during setup: %s", e)
399             exit(1)
400
401     def __enter__(self):
402         if self.args.replace:
403             unmount(path=self.args.mountpoint,
404                     timeout=self.args.unmount_timeout)
405         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
406         if self.daemon:
407             daemon.DaemonContext(
408                 working_directory=os.path.dirname(self.args.mountpoint),
409                 files_preserve=list(range(
410                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
411             ).open()
412         if self.listen_for_events and not self.args.disable_event_listening:
413             self.operations.listen_for_events()
414         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
415         self.llfuse_thread.daemon = True
416         self.llfuse_thread.start()
417         self.operations.initlock.wait()
418         return self
419
420     def __exit__(self, exc_type, exc_value, traceback):
421         if self.operations.events:
422             self.operations.events.close(timeout=self.args.unmount_timeout)
423         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
424         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
425         if self.llfuse_thread.is_alive():
426             self.logger.warning("Mount.__exit__:"
427                                 " llfuse thread still alive %fs after umount"
428                                 " -- abandoning and exiting anyway",
429                                 self.args.unmount_timeout)
430
431     def run(self):
432         if self.args.unmount or self.args.unmount_all:
433             unmount(path=self.args.mountpoint,
434                     subtype=self.args.subtype,
435                     timeout=self.args.unmount_timeout,
436                     recursive=self.args.unmount_all)
437         elif self.args.exec_args:
438             self._run_exec()
439         else:
440             self._run_standalone()
441
442     def _fuse_options(self):
443         """FUSE mount options; see mount.fuse(8)"""
444         opts = [optname for optname in ['allow_other', 'debug']
445                 if getattr(self.args, optname)]
446         # Increase default read/write size from 4KiB to 128KiB
447         opts += ["big_writes", "max_read=131072"]
448         if self.args.subtype:
449             opts += ["subtype="+self.args.subtype]
450         return opts
451
452     def _setup_logging(self):
453         # Configure a log handler based on command-line switches.
454         if self.args.logfile:
455             log_handler = logging.FileHandler(self.args.logfile)
456             log_handler.setFormatter(logging.Formatter(
457                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
458                 '%Y-%m-%d %H:%M:%S'))
459         else:
460             log_handler = None
461
462         if log_handler is not None:
463             arvados.logger.removeHandler(arvados.log_handler)
464             arvados.logger.addHandler(log_handler)
465
466         if self.args.debug:
467             arvados.logger.setLevel(logging.DEBUG)
468             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
469             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
470             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
471             self.logger.debug("arv-mount debugging enabled")
472
473         self.logger.info("%s %s started", sys.argv[0], __version__)
474         self.logger.info("enable write is %s", self.args.enable_write)
475
476     def _setup_api(self):
477         try:
478             # default value of file_cache is 0, this tells KeepBlockCache to
479             # choose a default based on whether disk_cache is enabled or not.
480
481             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
482                                                       disk_cache=self.args.disk_cache,
483                                                       disk_cache_dir=self.args.disk_cache_dir)
484
485             # If there's too many prefetch threads and you
486             # max out the CPU, delivering data to the FUSE
487             # layer actually ends up being slower.
488             # Experimentally, capping 7 threads seems to
489             # be a sweet spot.
490             #prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
491             prefetch_threads = 0
492
493             self.api = arvados.safeapi.ThreadSafeApiCache(
494                 apiconfig=arvados.config.settings(),
495                 api_params={
496                     'num_retries': self.args.retries,
497                 },
498                 keep_params={
499                     'block_cache': block_cache,
500                     'num_prefetch_threads': prefetch_threads,
501                     'num_retries': self.args.retries,
502                 },
503                 version='v1',
504             )
505         except KeyError as e:
506             self.logger.error("Missing environment: %s", e)
507             exit(1)
508         # Do a sanity check that we have a working arvados host + token.
509         self.api.users().current().execute()
510
511     def _setup_mount(self):
512         self.operations = Operations(
513             os.getuid(),
514             os.getgid(),
515             api_client=self.api,
516             encoding=self.args.encoding,
517             inode_cache=InodeCache(cap=self.args.directory_cache),
518             enable_write=self.args.enable_write)
519
520         if self.args.crunchstat_interval:
521             statsthread = threading.Thread(
522                 target=crunchstat.statlogger,
523                 args=(self.args.crunchstat_interval,
524                       self.api.keep,
525                       self.operations))
526             statsthread.daemon = True
527             statsthread.start()
528
529         usr = self.api.users().current().execute(num_retries=self.args.retries)
530         now = time.time()
531         dir_class = None
532         dir_args = [
533             llfuse.ROOT_INODE,
534             self.operations.inodes,
535             self.api,
536             self.args.retries,
537             self.args.enable_write,
538             self.args.filters,
539         ]
540         mount_readme = False
541
542         storage_classes = None
543         if self.args.storage_classes is not None:
544             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
545             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
546
547         if self.args.collection is not None:
548             # Set up the request handler with the collection at the root
549             # First check that the collection is readable
550             self.api.collections().get(uuid=self.args.collection).execute()
551             self.args.mode = 'collection'
552             dir_class = CollectionDirectory
553             dir_args.append(self.args.collection)
554         elif self.args.project is not None:
555             self.args.mode = 'project'
556             dir_class = ProjectDirectory
557             dir_args.append(
558                 self.api.groups().get(uuid=self.args.project).execute(
559                     num_retries=self.args.retries))
560
561         if (self.args.mount_by_id or
562             self.args.mount_by_pdh or
563             self.args.mount_by_tag or
564             self.args.mount_home or
565             self.args.mount_shared or
566             self.args.mount_tmp):
567             if self.args.mode is not None:
568                 sys.exit(
569                     "Cannot combine '{}' mode with custom --mount-* options.".
570                     format(self.args.mode))
571         elif self.args.mode is None:
572             # If no --mount-custom or custom mount args, --all is the default
573             self.args.mode = 'all'
574
575         if self.args.mode in ['by_id', 'by_pdh']:
576             # Set up the request handler with the 'magic directory' at the root
577             dir_class = MagicDirectory
578             dir_args.append(self.args.mode == 'by_pdh')
579         elif self.args.mode == 'by_tag':
580             dir_class = TagsDirectory
581         elif self.args.mode == 'shared':
582             dir_class = SharedDirectory
583             dir_args.append(usr)
584         elif self.args.mode == 'home':
585             dir_class = ProjectDirectory
586             dir_args.append(usr)
587             dir_args.append(True)
588         elif self.args.mode == 'all':
589             self.args.mount_by_id = ['by_id']
590             self.args.mount_by_tag = ['by_tag']
591             self.args.mount_home = ['home']
592             self.args.mount_shared = ['shared']
593             mount_readme = True
594
595         if dir_class is not None:
596             if dir_class in [TagsDirectory, CollectionDirectory]:
597                 ent = dir_class(*dir_args)
598             else:
599                 ent = dir_class(*dir_args, storage_classes=storage_classes)
600             self.operations.inodes.add_entry(ent)
601             self.listen_for_events = ent.want_event_subscribe()
602             return
603
604         e = self.operations.inodes.add_entry(Directory(
605             llfuse.ROOT_INODE,
606             self.operations.inodes,
607             self.api.config,
608             self.args.enable_write,
609             self.args.filters,
610         ))
611         dir_args[0] = e.inode
612
613         for name in self.args.mount_by_id:
614             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
615         for name in self.args.mount_by_pdh:
616             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
617         for name in self.args.mount_by_tag:
618             self._add_mount(e, name, TagsDirectory(*dir_args))
619         for name in self.args.mount_home:
620             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
621         for name in self.args.mount_shared:
622             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
623         for name in self.args.mount_tmp:
624             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
625
626         if mount_readme:
627             text = self._readme_text(
628                 arvados.config.get('ARVADOS_API_HOST'),
629                 usr['email'])
630             self._add_mount(e, 'README', StringFile(e.inode, text, now))
631
632     def _add_mount(self, tld, name, ent):
633         if name in ['', '.', '..'] or '/' in name:
634             sys.exit("Mount point '{}' is not supported.".format(name))
635         tld._entries[name] = self.operations.inodes.add_entry(ent)
636         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
637
638     def _readme_text(self, api_host, user_email):
639         return '''
640 Welcome to Arvados!  This directory provides file system access to
641 files and objects available on the Arvados installation located at
642 '{}' using credentials for user '{}'.
643
644 From here, the following directories are available:
645
646   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
647   by_tag/    Access to Keep collections organized by tag.
648   home/      The contents of your home project.
649   shared/    Projects shared with you.
650
651 '''.format(api_host, user_email)
652
653     def _run_exec(self):
654         rc = 255
655         with self:
656             try:
657                 sp = subprocess.Popen(self.args.exec_args, shell=False)
658
659                 # forward signals to the process.
660                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
661                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
662                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
663
664                 # wait for process to complete.
665                 rc = sp.wait()
666
667                 # restore default signal handlers.
668                 signal.signal(signal.SIGINT, signal.SIG_DFL)
669                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
670                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
671             except Exception as e:
672                 self.logger.exception(
673                     'arv-mount: exception during exec %s', self.args.exec_args)
674                 try:
675                     rc = e.errno
676                 except AttributeError:
677                     pass
678         exit(rc)
679
680     def _run_standalone(self):
681         try:
682             self.daemon = not self.args.foreground
683             with self:
684                 self.llfuse_thread.join(timeout=None)
685         except Exception as e:
686             self.logger.exception('arv-mount: exception during mount: %s', e)
687             exit(getattr(e, 'errno', 1))
688         exit(0)
689
690     def _llfuse_main(self):
691         try:
692             llfuse.main()
693         except:
694             llfuse.close(unmount=False)
695             raise
696         llfuse.close()