Merge branch '20482-installer-improvements'. Closes #20482
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description='''Mount Keep data under the local filesystem.  Default mode is --home''',
32             epilog="""
33     Note: When using the --exec feature, you must either specify the
34     mountpoint before --exec, or mark the end of your --exec arguments
35     with "--".
36             """)
37         self.add_argument('--version', action='version',
38                           version=u"%s %s" % (sys.argv[0], __version__),
39                           help='Print version and exit.')
40         self.add_argument('mountpoint', type=str, help="""Mount point.""")
41         self.add_argument('--allow-other', action='store_true',
42                             help="""Let other users read the mount""")
43         self.add_argument('--subtype', type=str, metavar='STRING',
44                             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""")
45
46         mode = self.add_mutually_exclusive_group()
47
48         mode.add_argument('--all', action='store_const', const='all', dest='mode',
49                                 help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
50         mode.add_argument('--custom', action='store_const', const=None, dest='mode',
51                                 help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
52         mode.add_argument('--home', action='store_const', const='home', dest='mode',
53                                 help="""Mount only the user's home project.""")
54         mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
55                                 help="""Mount only list of projects shared with the user.""")
56         mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
57                                 help="""Mount subdirectories listed by tag.""")
58         mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
59                                 help="""Mount subdirectories listed by portable data hash or uuid.""")
60         mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
61                                 help="""Mount subdirectories listed by portable data hash.""")
62         mode.add_argument('--project', type=str, metavar='UUID',
63                                 help="""Mount the specified project.""")
64         mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
65                                 help="""Mount only the specified collection.""")
66
67         mounts = self.add_argument_group('Custom mount options')
68         mounts.add_argument('--mount-by-pdh',
69                             type=str, metavar='PATH', action='append', default=[],
70                             help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
71         mounts.add_argument('--mount-by-id',
72                             type=str, metavar='PATH', action='append', default=[],
73                             help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
74         mounts.add_argument('--mount-by-tag',
75                             type=str, metavar='PATH', action='append', default=[],
76                             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
77         mounts.add_argument('--mount-home',
78                             type=str, metavar='PATH', action='append', default=[],
79                             help="Mount the current user's home project at mountpoint/PATH.")
80         mounts.add_argument('--mount-shared',
81                             type=str, metavar='PATH', action='append', default=[],
82                             help="Mount projects shared with the current user at mountpoint/PATH.")
83         mounts.add_argument('--mount-tmp',
84                             type=str, metavar='PATH', action='append', default=[],
85                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
86
87
88         self.add_argument('--debug', action='store_true', help="""Debug mode""")
89         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
90         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
91         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
92
93         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
94         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
95
96         cachetype = self.add_mutually_exclusive_group()
97         cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
98         cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
99
100         self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
101
102         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
103
104         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
105         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
106         self.add_argument('--storage-classes', type=str, metavar='CLASSES', help="Specify comma separated list of storage classes to be used when saving data of new collections", default=None)
107
108         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
109
110         unmount = self.add_mutually_exclusive_group()
111         unmount.add_argument('--unmount', action='store_true', default=False,
112                              help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
113         unmount.add_argument('--unmount-all', action='store_true', default=False,
114                              help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
115         unmount.add_argument('--replace', action='store_true', default=False,
116                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
117         self.add_argument('--unmount-timeout',
118                           type=float, default=2.0,
119                           help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
120
121         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
122                             dest="exec_args", metavar=('command', 'args', '...', '--'),
123                             help="""Mount, run a command, then unmount and exit""")
124
125
126 class Mount(object):
127     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
128         self.daemon = False
129         self.logger = logger
130         self.args = args
131         self.listen_for_events = False
132
133         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
134         if self.args.logfile:
135             self.args.logfile = os.path.realpath(self.args.logfile)
136
137         try:
138             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
139             if nofile_limit[0] < 10240:
140                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(10240, nofile_limit[1]), nofile_limit[1]))
141         except Exception as e:
142             self.logger.warning("arv-mount: unable to adjust file handle limit: %s", e)
143
144         self.logger.debug("arv-mount: file handle limit is %s", resource.getrlimit(resource.RLIMIT_NOFILE))
145
146         try:
147             self._setup_logging()
148             self._setup_api()
149             self._setup_mount()
150         except Exception as e:
151             self.logger.exception("arv-mount: exception during setup: %s", e)
152             exit(1)
153
154     def __enter__(self):
155         if self.args.replace:
156             unmount(path=self.args.mountpoint,
157                     timeout=self.args.unmount_timeout)
158         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
159         if self.daemon:
160             daemon.DaemonContext(
161                 working_directory=os.path.dirname(self.args.mountpoint),
162                 files_preserve=list(range(
163                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
164             ).open()
165         if self.listen_for_events and not self.args.disable_event_listening:
166             self.operations.listen_for_events()
167         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
168         self.llfuse_thread.daemon = True
169         self.llfuse_thread.start()
170         self.operations.initlock.wait()
171         return self
172
173     def __exit__(self, exc_type, exc_value, traceback):
174         if self.operations.events:
175             self.operations.events.close(timeout=self.args.unmount_timeout)
176         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
177         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
178         if self.llfuse_thread.is_alive():
179             self.logger.warning("Mount.__exit__:"
180                                 " llfuse thread still alive %fs after umount"
181                                 " -- abandoning and exiting anyway",
182                                 self.args.unmount_timeout)
183
184     def run(self):
185         if self.args.unmount or self.args.unmount_all:
186             unmount(path=self.args.mountpoint,
187                     subtype=self.args.subtype,
188                     timeout=self.args.unmount_timeout,
189                     recursive=self.args.unmount_all)
190         elif self.args.exec_args:
191             self._run_exec()
192         else:
193             self._run_standalone()
194
195     def _fuse_options(self):
196         """FUSE mount options; see mount.fuse(8)"""
197         opts = [optname for optname in ['allow_other', 'debug']
198                 if getattr(self.args, optname)]
199         # Increase default read/write size from 4KiB to 128KiB
200         opts += ["big_writes", "max_read=131072"]
201         if self.args.subtype:
202             opts += ["subtype="+self.args.subtype]
203         return opts
204
205     def _setup_logging(self):
206         # Configure a log handler based on command-line switches.
207         if self.args.logfile:
208             log_handler = logging.FileHandler(self.args.logfile)
209             log_handler.setFormatter(logging.Formatter(
210                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
211                 '%Y-%m-%d %H:%M:%S'))
212         else:
213             log_handler = None
214
215         if log_handler is not None:
216             arvados.logger.removeHandler(arvados.log_handler)
217             arvados.logger.addHandler(log_handler)
218
219         if self.args.debug:
220             arvados.logger.setLevel(logging.DEBUG)
221             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
222             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
223             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
224             self.logger.debug("arv-mount debugging enabled")
225
226         self.logger.info("%s %s started", sys.argv[0], __version__)
227         self.logger.info("enable write is %s", self.args.enable_write)
228
229     def _setup_api(self):
230         try:
231             self.api = arvados.safeapi.ThreadSafeApiCache(
232                 apiconfig=arvados.config.settings(),
233                 api_params={
234                     'num_retries': self.args.retries,
235                 },
236                 # default value of file_cache is 0, this tells KeepBlockCache to
237                 # choose a default based on whether disk_cache is enabled or not.
238                 keep_params={
239                     'block_cache': arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
240                                                                disk_cache=self.args.disk_cache,
241                                                                disk_cache_dir=self.args.disk_cache_dir),
242                     'num_retries': self.args.retries,
243                 },
244                 version='v1',
245             )
246         except KeyError as e:
247             self.logger.error("Missing environment: %s", e)
248             exit(1)
249         # Do a sanity check that we have a working arvados host + token.
250         self.api.users().current().execute()
251
252     def _setup_mount(self):
253         self.operations = Operations(
254             os.getuid(),
255             os.getgid(),
256             api_client=self.api,
257             encoding=self.args.encoding,
258             inode_cache=InodeCache(cap=self.args.directory_cache),
259             enable_write=self.args.enable_write)
260
261         if self.args.crunchstat_interval:
262             statsthread = threading.Thread(
263                 target=crunchstat.statlogger,
264                 args=(self.args.crunchstat_interval,
265                       self.api.keep,
266                       self.operations))
267             statsthread.daemon = True
268             statsthread.start()
269
270         usr = self.api.users().current().execute(num_retries=self.args.retries)
271         now = time.time()
272         dir_class = None
273         dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries, self.args.enable_write]
274         mount_readme = False
275
276         storage_classes = None
277         if self.args.storage_classes is not None:
278             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
279             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
280
281         if self.args.collection is not None:
282             # Set up the request handler with the collection at the root
283             # First check that the collection is readable
284             self.api.collections().get(uuid=self.args.collection).execute()
285             self.args.mode = 'collection'
286             dir_class = CollectionDirectory
287             dir_args.append(self.args.collection)
288         elif self.args.project is not None:
289             self.args.mode = 'project'
290             dir_class = ProjectDirectory
291             dir_args.append(
292                 self.api.groups().get(uuid=self.args.project).execute(
293                     num_retries=self.args.retries))
294
295         if (self.args.mount_by_id or
296             self.args.mount_by_pdh or
297             self.args.mount_by_tag or
298             self.args.mount_home or
299             self.args.mount_shared or
300             self.args.mount_tmp):
301             if self.args.mode is not None:
302                 sys.exit(
303                     "Cannot combine '{}' mode with custom --mount-* options.".
304                     format(self.args.mode))
305         elif self.args.mode is None:
306             # If no --mount-custom or custom mount args, --all is the default
307             self.args.mode = 'all'
308
309         if self.args.mode in ['by_id', 'by_pdh']:
310             # Set up the request handler with the 'magic directory' at the root
311             dir_class = MagicDirectory
312             dir_args.append(self.args.mode == 'by_pdh')
313         elif self.args.mode == 'by_tag':
314             dir_class = TagsDirectory
315         elif self.args.mode == 'shared':
316             dir_class = SharedDirectory
317             dir_args.append(usr)
318         elif self.args.mode == 'home':
319             dir_class = ProjectDirectory
320             dir_args.append(usr)
321             dir_args.append(True)
322         elif self.args.mode == 'all':
323             self.args.mount_by_id = ['by_id']
324             self.args.mount_by_tag = ['by_tag']
325             self.args.mount_home = ['home']
326             self.args.mount_shared = ['shared']
327             mount_readme = True
328
329         if dir_class is not None:
330             if dir_class in [TagsDirectory, CollectionDirectory]:
331                 ent = dir_class(*dir_args)
332             else:
333                 ent = dir_class(*dir_args, storage_classes=storage_classes)
334             self.operations.inodes.add_entry(ent)
335             self.listen_for_events = ent.want_event_subscribe()
336             return
337
338         e = self.operations.inodes.add_entry(Directory(
339             llfuse.ROOT_INODE, self.operations.inodes, self.api.config, self.args.enable_write))
340         dir_args[0] = e.inode
341
342         for name in self.args.mount_by_id:
343             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
344         for name in self.args.mount_by_pdh:
345             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
346         for name in self.args.mount_by_tag:
347             self._add_mount(e, name, TagsDirectory(*dir_args))
348         for name in self.args.mount_home:
349             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
350         for name in self.args.mount_shared:
351             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
352         for name in self.args.mount_tmp:
353             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
354
355         if mount_readme:
356             text = self._readme_text(
357                 arvados.config.get('ARVADOS_API_HOST'),
358                 usr['email'])
359             self._add_mount(e, 'README', StringFile(e.inode, text, now))
360
361     def _add_mount(self, tld, name, ent):
362         if name in ['', '.', '..'] or '/' in name:
363             sys.exit("Mount point '{}' is not supported.".format(name))
364         tld._entries[name] = self.operations.inodes.add_entry(ent)
365         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
366
367     def _readme_text(self, api_host, user_email):
368         return '''
369 Welcome to Arvados!  This directory provides file system access to
370 files and objects available on the Arvados installation located at
371 '{}' using credentials for user '{}'.
372
373 From here, the following directories are available:
374
375   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
376   by_tag/    Access to Keep collections organized by tag.
377   home/      The contents of your home project.
378   shared/    Projects shared with you.
379
380 '''.format(api_host, user_email)
381
382     def _run_exec(self):
383         rc = 255
384         with self:
385             try:
386                 sp = subprocess.Popen(self.args.exec_args, shell=False)
387
388                 # forward signals to the process.
389                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
390                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
391                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
392
393                 # wait for process to complete.
394                 rc = sp.wait()
395
396                 # restore default signal handlers.
397                 signal.signal(signal.SIGINT, signal.SIG_DFL)
398                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
399                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
400             except Exception as e:
401                 self.logger.exception(
402                     'arv-mount: exception during exec %s', self.args.exec_args)
403                 try:
404                     rc = e.errno
405                 except AttributeError:
406                     pass
407         exit(rc)
408
409     def _run_standalone(self):
410         try:
411             self.daemon = not self.args.foreground
412             with self:
413                 self.llfuse_thread.join(timeout=None)
414         except Exception as e:
415             self.logger.exception('arv-mount: exception during mount: %s', e)
416             exit(getattr(e, 'errno', 1))
417         exit(0)
418
419     def _llfuse_main(self):
420         try:
421             llfuse.main()
422         except:
423             llfuse.close(unmount=False)
424             raise
425         llfuse.close()