Merge branch '21717-keepstore-cors'
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description="Interact with Arvados data through a local filesystem",
32         )
33         self.add_argument(
34             '--version',
35             action='version',
36             version=u"%s %s" % (sys.argv[0], __version__),
37             help="Print version and exit",
38         )
39         self.add_argument(
40             'mountpoint',
41             metavar='MOUNT_DIR',
42             help="Directory path to mount data",
43         )
44
45         mode_group = self.add_argument_group("Mount contents")
46         mode = mode_group.add_mutually_exclusive_group()
47         mode.add_argument(
48             '--all',
49             action='store_const',
50             const='all',
51             dest='mode',
52             help="""
53 Mount a subdirectory for each mode: `home`, `shared`, `by_id`, and `by_tag`
54 (default if no `--mount-*` options are given)
55 """,
56         )
57         mode.add_argument(
58             '--custom',
59             action='store_const',
60             const=None,
61             dest='mode',
62             help="""
63 Mount a subdirectory for each mode specified by a `--mount-*` option
64 (default if any `--mount-*` options are given;
65 see "Mount custom layout and filtering" section)
66 """,
67         )
68         mode.add_argument(
69             '--collection',
70             metavar='UUID_OR_PDH',
71             help="Mount the specified collection",
72         )
73         mode.add_argument(
74             '--home',
75             action='store_const',
76             const='home',
77             dest='mode',
78             help="Mount your home project",
79         )
80         mode.add_argument(
81             '--project',
82             metavar='UUID',
83             help="Mount the specified project",
84         )
85         mode.add_argument(
86             '--shared',
87             action='store_const',
88             const='shared',
89             dest='mode',
90             help="Mount a subdirectory for each project shared with you",
91         )
92         mode.add_argument(
93             '--by-id',
94             action='store_const',
95             const='by_id',
96             dest='mode',
97             help="""
98 Mount a magic directory where collections and projects are accessible through
99 subdirectories named after their UUID or portable data hash
100 """,
101         )
102         mode.add_argument(
103             '--by-pdh',
104             action='store_const',
105             const='by_pdh',
106             dest='mode',
107             help="""
108 Mount a magic directory where collections are accessible through
109 subdirectories named after their portable data hash
110 """,
111         )
112         mode.add_argument(
113             '--by-tag',
114             action='store_const',
115             const='by_tag',
116             dest='mode',
117             help="Mount a subdirectory for each tag attached to a collection or project",
118         )
119
120         mounts = self.add_argument_group("Mount custom layout and filtering")
121         mounts.add_argument(
122             '--filters',
123             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
124             help="""
125 Filters to apply to all project, shared, and tag directory contents.
126 Pass filters as either a JSON string or a path to a JSON file.
127 The JSON object should be a list of filters in Arvados API list filter syntax.
128 """,
129         )
130         mounts.add_argument(
131             '--mount-home',
132             metavar='PATH',
133             action='append',
134             default=[],
135             help="Make your home project available under the mount at `PATH`",
136         )
137         mounts.add_argument(
138             '--mount-shared',
139             metavar='PATH',
140             action='append',
141             default=[],
142             help="Make projects shared with you available under the mount at `PATH`",
143         )
144         mounts.add_argument(
145             '--mount-tmp',
146             metavar='PATH',
147             action='append',
148             default=[],
149             help="""
150 Make a new temporary writable collection available under the mount at `PATH`.
151 This collection is deleted when the mount is unmounted.
152 """,
153         )
154         mounts.add_argument(
155             '--mount-by-id',
156             metavar='PATH',
157             action='append',
158             default=[],
159             help="""
160 Make a magic directory available under the mount at `PATH` where collections and
161 projects are accessible through subdirectories named after their UUID or
162 portable data hash
163 """,
164         )
165         mounts.add_argument(
166             '--mount-by-pdh',
167             metavar='PATH',
168             action='append',
169             default=[],
170             help="""
171 Make a magic directory available under the mount at `PATH` where collections
172 are accessible through subdirectories named after portable data hash
173 """,
174         )
175         mounts.add_argument(
176             '--mount-by-tag',
177             metavar='PATH',
178             action='append',
179             default=[],
180             help="""
181 Make a subdirectory for each tag attached to a collection or project available
182 under the mount at `PATH`
183 """ ,
184         )
185
186         perms = self.add_argument_group("Mount access and permissions")
187         perms.add_argument(
188             '--allow-other',
189             action='store_true',
190             help="Let other users on this system read mounted data (default false)",
191         )
192         perms.add_argument(
193             '--read-only',
194             action='store_false',
195             default=False,
196             dest='enable_write',
197             help="Mounted data cannot be modified from the mount (default)",
198         )
199         perms.add_argument(
200             '--read-write',
201             action='store_true',
202             default=False,
203             dest='enable_write',
204             help="Mounted data can be modified from the mount",
205         )
206
207         lifecycle = self.add_argument_group("Mount lifecycle management")
208         lifecycle.add_argument(
209             '--exec',
210             nargs=argparse.REMAINDER,
211             dest="exec_args",
212             help="""
213 Mount data, run the specified command, then unmount and exit.
214 `--exec` reads all remaining options as the command to run,
215 so it must be the last option you specify.
216 Either end your command arguments (and other options) with a `--` argument,
217 or specify `--exec` after your mount point.
218 """,
219         )
220         lifecycle.add_argument(
221             '--foreground',
222             action='store_true',
223             default=False,
224             help="Run mount process in the foreground instead of daemonizing (default false)",
225         )
226         lifecycle.add_argument(
227             '--subtype',
228             help="Set mounted filesystem type to `fuse.SUBTYPE` (default is just `fuse`)",
229         )
230         unmount = lifecycle.add_mutually_exclusive_group()
231         unmount.add_argument(
232             '--replace',
233             action='store_true',
234             default=False,
235             help="""
236 If a FUSE mount is already mounted at the given directory,
237 unmount it before mounting the requested data.
238 If `--subtype` is specified, unmount only if the mount has that subtype.
239 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
240 """,
241         )
242         unmount.add_argument(
243             '--unmount',
244             action='store_true',
245             default=False,
246             help="""
247 If a FUSE mount is already mounted at the given directory, unmount it and exit.
248 If `--subtype` is specified, unmount only if the mount has that subtype.
249 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
250 """,
251         )
252         unmount.add_argument(
253             '--unmount-all',
254             action='store_true',
255             default=False,
256             help="""
257 Unmount all FUSE mounts at or below the given directory, then exit.
258 If `--subtype` is specified, unmount only if the mount has that subtype.
259 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
260 """,
261         )
262         lifecycle.add_argument(
263             '--unmount-timeout',
264             type=float,
265             default=2.0,
266             metavar='SECONDS',
267             help="""
268 The number of seconds to wait for a clean unmount after an `--exec` command has
269 exited (default %(default).01f).
270 After this time, the mount will be forcefully unmounted.
271 """,
272         )
273
274         reporting = self.add_argument_group("Mount logging and statistics")
275         reporting.add_argument(
276             '--crunchstat-interval',
277             type=float,
278             default=0.0,
279             metavar='SECONDS',
280             help="Write stats to stderr every N seconds (default disabled)",
281         )
282         reporting.add_argument(
283             '--debug',
284             action='store_true',
285             help="Log debug information",
286         )
287         reporting.add_argument(
288             '--logfile',
289             help="Write debug logs and errors to the specified file (default stderr)",
290         )
291
292         cache = self.add_argument_group("Mount local cache setup")
293         cachetype = cache.add_mutually_exclusive_group()
294         cachetype.add_argument(
295             '--disk-cache',
296             action='store_true',
297             default=True,
298             dest='disk_cache',
299             help="Cache data on the local filesystem (default)",
300         )
301         cachetype.add_argument(
302             '--ram-cache',
303             action='store_false',
304             default=True,
305             dest='disk_cache',
306             help="Cache data in memory",
307         )
308         cache.add_argument(
309             '--disk-cache-dir',
310             metavar="DIRECTORY",
311             help="Filesystem cache location (default `~/.cache/arvados/keep`)",
312         )
313         cache.add_argument(
314             '--directory-cache',
315             type=int,
316             default=128*1024*1024,
317             metavar='BYTES',
318             help="Size of directory data cache in bytes (default 128 MiB)",
319         )
320         cache.add_argument(
321             '--file-cache',
322             type=int,
323             default=0,
324             metavar='BYTES',
325             help="""
326 Size of file data cache in bytes
327 (default 8 GiB for filesystem cache, 256 MiB for memory cache)
328 """,
329         )
330
331         plumbing = self.add_argument_group("Mount interactions with Arvados and Linux")
332         plumbing.add_argument(
333             '--disable-event-listening',
334             action='store_true',
335             dest='disable_event_listening',
336             default=False,
337             help="Don't subscribe to events on the API server to update mount contents",
338         )
339         plumbing.add_argument(
340             '--encoding',
341             default="utf-8",
342             help="""
343 Filesystem character encoding
344 (default %(default)r; specify a name from the Python codec registry)
345 """,
346         )
347         plumbing.add_argument(
348             '--storage-classes',
349             metavar='CLASSES',
350             help="Comma-separated list of storage classes to request for new collections",
351         )
352         # This is a hidden argument used by tests.  Normally this
353         # value will be extracted from the cluster config, but mocking
354         # the cluster config under the presence of multiple threads
355         # and processes turned out to be too complicated and brittle.
356         plumbing.add_argument(
357             '--fsns',
358             type=str,
359             default=None,
360             help=argparse.SUPPRESS)
361
362 class Mount(object):
363     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
364         self.daemon = False
365         self.logger = logger
366         self.args = args
367         self.listen_for_events = False
368
369         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
370         if self.args.logfile:
371             self.args.logfile = os.path.realpath(self.args.logfile)
372
373         try:
374             self._setup_logging()
375         except Exception as e:
376             self.logger.exception("exception during setup: %s", e)
377             exit(1)
378
379         try:
380             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
381
382             minlimit = 10240
383             if self.args.file_cache:
384                 # Adjust the file handle limit so it can meet
385                 # the desired cache size. Multiply by 8 because the
386                 # number of 64 MiB cache slots that keepclient
387                 # allocates is RLIMIT_NOFILE / 8
388                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
389
390             if nofile_limit[0] < minlimit:
391                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
392
393             if minlimit > nofile_limit[1]:
394                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
395
396         except Exception as e:
397             self.logger.warning("unable to adjust file handle limit: %s", e)
398
399         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
400         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
401
402         try:
403             self._setup_api()
404             self._setup_mount()
405         except Exception as e:
406             self.logger.exception("exception during setup: %s", e)
407             exit(1)
408
409     def __enter__(self):
410         if self.args.replace:
411             unmount(path=self.args.mountpoint,
412                     timeout=self.args.unmount_timeout)
413         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
414         if self.daemon:
415             daemon.DaemonContext(
416                 working_directory=os.path.dirname(self.args.mountpoint),
417                 files_preserve=list(range(
418                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
419             ).open()
420         if self.listen_for_events and not self.args.disable_event_listening:
421             self.operations.listen_for_events()
422         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
423         self.llfuse_thread.daemon = True
424         self.llfuse_thread.start()
425         self.operations.initlock.wait()
426         return self
427
428     def __exit__(self, exc_type, exc_value, traceback):
429         if self.operations.events:
430             self.operations.events.close(timeout=self.args.unmount_timeout)
431         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
432         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
433         if self.llfuse_thread.is_alive():
434             self.logger.warning("Mount.__exit__:"
435                                 " llfuse thread still alive %fs after umount"
436                                 " -- abandoning and exiting anyway",
437                                 self.args.unmount_timeout)
438
439     def run(self):
440         if self.args.unmount or self.args.unmount_all:
441             unmount(path=self.args.mountpoint,
442                     subtype=self.args.subtype,
443                     timeout=self.args.unmount_timeout,
444                     recursive=self.args.unmount_all)
445         elif self.args.exec_args:
446             self._run_exec()
447         else:
448             self._run_standalone()
449
450     def _fuse_options(self):
451         """FUSE mount options; see mount.fuse(8)"""
452         opts = [optname for optname in ['allow_other', 'debug']
453                 if getattr(self.args, optname)]
454         # Increase default read/write size from 4KiB to 128KiB
455         opts += ["big_writes", "max_read=131072"]
456         if self.args.subtype:
457             opts += ["subtype="+self.args.subtype]
458         return opts
459
460     def _setup_logging(self):
461         # Configure a log handler based on command-line switches.
462         if self.args.logfile:
463             log_handler = logging.FileHandler(self.args.logfile)
464             log_handler.setFormatter(logging.Formatter(
465                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
466                 '%Y-%m-%d %H:%M:%S'))
467         else:
468             log_handler = None
469
470         if log_handler is not None:
471             arvados.logger.removeHandler(arvados.log_handler)
472             arvados.logger.addHandler(log_handler)
473
474         if self.args.debug:
475             arvados.logger.setLevel(logging.DEBUG)
476             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
477             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
478             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
479             self.logger.debug("arv-mount debugging enabled")
480
481         self.logger.info("%s %s started", sys.argv[0], __version__)
482         self.logger.info("enable write is %s", self.args.enable_write)
483
484     def _setup_api(self):
485         try:
486             # default value of file_cache is 0, this tells KeepBlockCache to
487             # choose a default based on whether disk_cache is enabled or not.
488
489             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
490                                                       disk_cache=self.args.disk_cache,
491                                                       disk_cache_dir=self.args.disk_cache_dir)
492
493             self.api = arvados.safeapi.ThreadSafeApiCache(
494                 apiconfig=arvados.config.settings(),
495                 api_params={
496                     'num_retries': self.args.retries,
497                 },
498                 keep_params={
499                     'block_cache': block_cache,
500                     'num_retries': self.args.retries,
501                 },
502                 version='v1',
503             )
504         except KeyError as e:
505             self.logger.error("Missing environment: %s", e)
506             exit(1)
507         # Do a sanity check that we have a working arvados host + token.
508         self.api.users().current().execute()
509
510     def _setup_mount(self):
511         self.operations = Operations(
512             os.getuid(),
513             os.getgid(),
514             api_client=self.api,
515             encoding=self.args.encoding,
516             inode_cache=InodeCache(cap=self.args.directory_cache),
517             enable_write=self.args.enable_write,
518             fsns=self.args.fsns)
519
520         if self.args.crunchstat_interval:
521             statsthread = threading.Thread(
522                 target=crunchstat.statlogger,
523                 args=(self.args.crunchstat_interval,
524                       self.api.keep,
525                       self.operations))
526             statsthread.daemon = True
527             statsthread.start()
528
529         usr = self.api.users().current().execute(num_retries=self.args.retries)
530         now = time.time()
531         dir_class = None
532         dir_args = [
533             llfuse.ROOT_INODE,
534             self.operations.inodes,
535             self.api,
536             self.args.retries,
537             self.args.enable_write,
538             self.args.filters,
539         ]
540         mount_readme = False
541
542         storage_classes = None
543         if self.args.storage_classes is not None:
544             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
545             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
546
547         if self.args.collection is not None:
548             # Set up the request handler with the collection at the root
549             # First check that the collection is readable
550             self.api.collections().get(uuid=self.args.collection).execute()
551             self.args.mode = 'collection'
552             dir_class = CollectionDirectory
553             dir_args.append(self.args.collection)
554         elif self.args.project is not None:
555             self.args.mode = 'project'
556             dir_class = ProjectDirectory
557             dir_args.append(
558                 self.api.groups().get(uuid=self.args.project).execute(
559                     num_retries=self.args.retries))
560
561         if (self.args.mount_by_id or
562             self.args.mount_by_pdh or
563             self.args.mount_by_tag or
564             self.args.mount_home or
565             self.args.mount_shared or
566             self.args.mount_tmp):
567             if self.args.mode is not None:
568                 sys.exit(
569                     "Cannot combine '{}' mode with custom --mount-* options.".
570                     format(self.args.mode))
571         elif self.args.mode is None:
572             # If no --mount-custom or custom mount args, --all is the default
573             self.args.mode = 'all'
574
575         if self.args.mode in ['by_id', 'by_pdh']:
576             # Set up the request handler with the 'magic directory' at the root
577             dir_class = MagicDirectory
578             dir_args.append(self.args.mode == 'by_pdh')
579         elif self.args.mode == 'by_tag':
580             dir_class = TagsDirectory
581         elif self.args.mode == 'shared':
582             dir_class = SharedDirectory
583             dir_args.append(usr)
584         elif self.args.mode == 'home':
585             dir_class = ProjectDirectory
586             dir_args.append(usr)
587             dir_args.append(True)
588         elif self.args.mode == 'all':
589             self.args.mount_by_id = ['by_id']
590             self.args.mount_by_tag = ['by_tag']
591             self.args.mount_home = ['home']
592             self.args.mount_shared = ['shared']
593             mount_readme = True
594
595         if dir_class is not None:
596             if dir_class in [TagsDirectory, CollectionDirectory]:
597                 ent = dir_class(*dir_args)
598             else:
599                 ent = dir_class(*dir_args, storage_classes=storage_classes)
600             self.operations.inodes.add_entry(ent)
601             self.listen_for_events = ent.want_event_subscribe()
602             return
603
604         e = self.operations.inodes.add_entry(Directory(
605             llfuse.ROOT_INODE,
606             self.operations.inodes,
607             self.args.enable_write,
608             self.args.filters,
609         ))
610         dir_args[0] = e.inode
611
612         for name in self.args.mount_by_id:
613             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
614         for name in self.args.mount_by_pdh:
615             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
616         for name in self.args.mount_by_tag:
617             self._add_mount(e, name, TagsDirectory(*dir_args))
618         for name in self.args.mount_home:
619             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
620         for name in self.args.mount_shared:
621             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
622         for name in self.args.mount_tmp:
623             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
624
625         if mount_readme:
626             text = self._readme_text(
627                 arvados.config.get('ARVADOS_API_HOST'),
628                 usr['email'])
629             self._add_mount(e, 'README', StringFile(e.inode, text, now))
630
631     def _add_mount(self, tld, name, ent):
632         if name in ['', '.', '..'] or '/' in name:
633             sys.exit("Mount point '{}' is not supported.".format(name))
634         tld._entries[name] = self.operations.inodes.add_entry(ent)
635         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
636
637     def _readme_text(self, api_host, user_email):
638         return '''
639 Welcome to Arvados!  This directory provides file system access to
640 files and objects available on the Arvados installation located at
641 '{}' using credentials for user '{}'.
642
643 From here, the following directories are available:
644
645   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
646   by_tag/    Access to Keep collections organized by tag.
647   home/      The contents of your home project.
648   shared/    Projects shared with you.
649
650 '''.format(api_host, user_email)
651
652     def _run_exec(self):
653         rc = 255
654         with self:
655             try:
656                 sp = subprocess.Popen(self.args.exec_args, shell=False)
657
658                 # forward signals to the process.
659                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
660                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
661                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
662
663                 # wait for process to complete.
664                 rc = sp.wait()
665
666                 # restore default signal handlers.
667                 signal.signal(signal.SIGINT, signal.SIG_DFL)
668                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
669                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
670             except Exception as e:
671                 self.logger.exception(
672                     'arv-mount: exception during exec %s', self.args.exec_args)
673                 try:
674                     rc = e.errno
675                 except AttributeError:
676                     pass
677         exit(rc)
678
679     def _run_standalone(self):
680         try:
681             self.daemon = not self.args.foreground
682             with self:
683                 self.llfuse_thread.join(timeout=None)
684         except Exception as e:
685             self.logger.exception('arv-mount: exception during mount: %s', e)
686             exit(getattr(e, 'errno', 1))
687         exit(0)
688
689     def _llfuse_main(self):
690         try:
691             llfuse.main(workers=10)
692         except:
693             llfuse.close(unmount=False)
694             raise
695         self.operations.begin_shutdown()
696         llfuse.close()