18941: Clamp to 4 block prefetch
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19
20 import arvados.commands._util as arv_cmd
21 from arvados_fuse import crunchstat
22 from arvados_fuse import *
23 from arvados_fuse.unmount import unmount
24 from arvados_fuse._version import __version__
25
26 class ArgumentParser(argparse.ArgumentParser):
27     def __init__(self):
28         super(ArgumentParser, self).__init__(
29             parents=[arv_cmd.retry_opt],
30             description='''Mount Keep data under the local filesystem.  Default mode is --home''',
31             epilog="""
32     Note: When using the --exec feature, you must either specify the
33     mountpoint before --exec, or mark the end of your --exec arguments
34     with "--".
35             """)
36         self.add_argument('--version', action='version',
37                           version=u"%s %s" % (sys.argv[0], __version__),
38                           help='Print version and exit.')
39         self.add_argument('mountpoint', type=str, help="""Mount point.""")
40         self.add_argument('--allow-other', action='store_true',
41                             help="""Let other users read the mount""")
42         self.add_argument('--subtype', type=str, metavar='STRING',
43                             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""")
44
45         mode = self.add_mutually_exclusive_group()
46
47         mode.add_argument('--all', action='store_const', const='all', dest='mode',
48                                 help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
49         mode.add_argument('--custom', action='store_const', const=None, dest='mode',
50                                 help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
51         mode.add_argument('--home', action='store_const', const='home', dest='mode',
52                                 help="""Mount only the user's home project.""")
53         mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
54                                 help="""Mount only list of projects shared with the user.""")
55         mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
56                                 help="""Mount subdirectories listed by tag.""")
57         mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
58                                 help="""Mount subdirectories listed by portable data hash or uuid.""")
59         mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
60                                 help="""Mount subdirectories listed by portable data hash.""")
61         mode.add_argument('--project', type=str, metavar='UUID',
62                                 help="""Mount the specified project.""")
63         mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
64                                 help="""Mount only the specified collection.""")
65
66         mounts = self.add_argument_group('Custom mount options')
67         mounts.add_argument('--mount-by-pdh',
68                             type=str, metavar='PATH', action='append', default=[],
69                             help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
70         mounts.add_argument('--mount-by-id',
71                             type=str, metavar='PATH', action='append', default=[],
72                             help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
73         mounts.add_argument('--mount-by-tag',
74                             type=str, metavar='PATH', action='append', default=[],
75                             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
76         mounts.add_argument('--mount-home',
77                             type=str, metavar='PATH', action='append', default=[],
78                             help="Mount the current user's home project at mountpoint/PATH.")
79         mounts.add_argument('--mount-shared',
80                             type=str, metavar='PATH', action='append', default=[],
81                             help="Mount projects shared with the current user at mountpoint/PATH.")
82         mounts.add_argument('--mount-tmp',
83                             type=str, metavar='PATH', action='append', default=[],
84                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
85
86         self.add_argument('--debug', action='store_true', help="""Debug mode""")
87         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
88         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
89         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
90
91         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
92         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
93
94         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
95
96         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
97         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
98         self.add_argument('--storage-classes', type=str, metavar='CLASSES', help="Specify comma separated list of storage classes to be used when saving data of new collections", default=None)
99
100         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
101
102         unmount = self.add_mutually_exclusive_group()
103         unmount.add_argument('--unmount', action='store_true', default=False,
104                              help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
105         unmount.add_argument('--unmount-all', action='store_true', default=False,
106                              help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
107         unmount.add_argument('--replace', action='store_true', default=False,
108                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
109         self.add_argument('--unmount-timeout',
110                           type=float, default=2.0,
111                           help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
112
113         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
114                             dest="exec_args", metavar=('command', 'args', '...', '--'),
115                             help="""Mount, run a command, then unmount and exit""")
116
117
118 class Mount(object):
119     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
120         self.daemon = False
121         self.logger = logger
122         self.args = args
123         self.listen_for_events = False
124
125         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
126         if self.args.logfile:
127             self.args.logfile = os.path.realpath(self.args.logfile)
128
129         try:
130             self._setup_logging()
131             self._setup_api()
132             self._setup_mount()
133         except Exception as e:
134             self.logger.exception("arv-mount: exception during setup: %s", e)
135             exit(1)
136
137     def __enter__(self):
138         if self.args.replace:
139             unmount(path=self.args.mountpoint,
140                     timeout=self.args.unmount_timeout)
141         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
142         if self.daemon:
143             daemon.DaemonContext(
144                 working_directory=os.path.dirname(self.args.mountpoint),
145                 files_preserve=list(range(
146                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
147             ).open()
148         if self.listen_for_events and not self.args.disable_event_listening:
149             self.operations.listen_for_events()
150         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
151         self.llfuse_thread.daemon = True
152         self.llfuse_thread.start()
153         self.operations.initlock.wait()
154         return self
155
156     def __exit__(self, exc_type, exc_value, traceback):
157         if self.operations.events:
158             self.operations.events.close(timeout=self.args.unmount_timeout)
159         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
160         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
161         if self.llfuse_thread.is_alive():
162             self.logger.warning("Mount.__exit__:"
163                                 " llfuse thread still alive %fs after umount"
164                                 " -- abandoning and exiting anyway",
165                                 self.args.unmount_timeout)
166
167     def run(self):
168         if self.args.unmount or self.args.unmount_all:
169             unmount(path=self.args.mountpoint,
170                     subtype=self.args.subtype,
171                     timeout=self.args.unmount_timeout,
172                     recursive=self.args.unmount_all)
173         elif self.args.exec_args:
174             self._run_exec()
175         else:
176             self._run_standalone()
177
178     def _fuse_options(self):
179         """FUSE mount options; see mount.fuse(8)"""
180         opts = [optname for optname in ['allow_other', 'debug']
181                 if getattr(self.args, optname)]
182         # Increase default read/write size from 4KiB to 128KiB
183         opts += ["big_writes", "max_read=131072"]
184         if self.args.subtype:
185             opts += ["subtype="+self.args.subtype]
186         return opts
187
188     def _setup_logging(self):
189         # Configure a log handler based on command-line switches.
190         if self.args.logfile:
191             log_handler = logging.FileHandler(self.args.logfile)
192             log_handler.setFormatter(logging.Formatter(
193                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
194                 '%Y-%m-%d %H:%M:%S'))
195         else:
196             log_handler = None
197
198         if log_handler is not None:
199             arvados.logger.removeHandler(arvados.log_handler)
200             arvados.logger.addHandler(log_handler)
201
202         if self.args.debug:
203             arvados.logger.setLevel(logging.DEBUG)
204             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
205             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
206             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
207             self.logger.debug("arv-mount debugging enabled")
208
209         self.logger.info("%s %s started", sys.argv[0], __version__)
210         self.logger.info("enable write is %s", self.args.enable_write)
211
212     def _setup_api(self):
213         try:
214             self.api = arvados.safeapi.ThreadSafeApiCache(
215                 apiconfig=arvados.config.settings(),
216                 keep_params={
217                     'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
218                     'num_retries': self.args.retries,
219                 })
220         except KeyError as e:
221             self.logger.error("Missing environment: %s", e)
222             exit(1)
223         # Do a sanity check that we have a working arvados host + token.
224         self.api.users().current().execute()
225
226     def _setup_mount(self):
227         self.operations = Operations(
228             os.getuid(),
229             os.getgid(),
230             api_client=self.api,
231             encoding=self.args.encoding,
232             inode_cache=InodeCache(cap=self.args.directory_cache),
233             enable_write=self.args.enable_write)
234
235         if self.args.crunchstat_interval:
236             statsthread = threading.Thread(
237                 target=crunchstat.statlogger,
238                 args=(self.args.crunchstat_interval,
239                       self.api.keep,
240                       self.operations))
241             statsthread.daemon = True
242             statsthread.start()
243
244         usr = self.api.users().current().execute(num_retries=self.args.retries)
245         now = time.time()
246         dir_class = None
247         dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries, self.args.enable_write]
248         mount_readme = False
249
250         storage_classes = None
251         if self.args.storage_classes is not None:
252             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
253             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
254
255         if self.args.collection is not None:
256             # Set up the request handler with the collection at the root
257             # First check that the collection is readable
258             self.api.collections().get(uuid=self.args.collection).execute()
259             self.args.mode = 'collection'
260             dir_class = CollectionDirectory
261             dir_args.append(self.args.collection)
262         elif self.args.project is not None:
263             self.args.mode = 'project'
264             dir_class = ProjectDirectory
265             dir_args.append(
266                 self.api.groups().get(uuid=self.args.project).execute(
267                     num_retries=self.args.retries))
268
269         if (self.args.mount_by_id or
270             self.args.mount_by_pdh or
271             self.args.mount_by_tag or
272             self.args.mount_home or
273             self.args.mount_shared or
274             self.args.mount_tmp):
275             if self.args.mode is not None:
276                 sys.exit(
277                     "Cannot combine '{}' mode with custom --mount-* options.".
278                     format(self.args.mode))
279         elif self.args.mode is None:
280             # If no --mount-custom or custom mount args, --all is the default
281             self.args.mode = 'all'
282
283         if self.args.mode in ['by_id', 'by_pdh']:
284             # Set up the request handler with the 'magic directory' at the root
285             dir_class = MagicDirectory
286             dir_args.append(self.args.mode == 'by_pdh')
287         elif self.args.mode == 'by_tag':
288             dir_class = TagsDirectory
289         elif self.args.mode == 'shared':
290             dir_class = SharedDirectory
291             dir_args.append(usr)
292         elif self.args.mode == 'home':
293             dir_class = ProjectDirectory
294             dir_args.append(usr)
295             dir_args.append(True)
296         elif self.args.mode == 'all':
297             self.args.mount_by_id = ['by_id']
298             self.args.mount_by_tag = ['by_tag']
299             self.args.mount_home = ['home']
300             self.args.mount_shared = ['shared']
301             mount_readme = True
302
303         if dir_class is not None:
304             if dir_class in [TagsDirectory, CollectionDirectory]:
305                 ent = dir_class(*dir_args)
306             else:
307                 ent = dir_class(*dir_args, storage_classes=storage_classes)
308             self.operations.inodes.add_entry(ent)
309             self.listen_for_events = ent.want_event_subscribe()
310             return
311
312         e = self.operations.inodes.add_entry(Directory(
313             llfuse.ROOT_INODE, self.operations.inodes, self.api.config, self.args.enable_write))
314         dir_args[0] = e.inode
315
316         for name in self.args.mount_by_id:
317             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
318         for name in self.args.mount_by_pdh:
319             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
320         for name in self.args.mount_by_tag:
321             self._add_mount(e, name, TagsDirectory(*dir_args))
322         for name in self.args.mount_home:
323             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
324         for name in self.args.mount_shared:
325             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
326         for name in self.args.mount_tmp:
327             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
328
329         if mount_readme:
330             text = self._readme_text(
331                 arvados.config.get('ARVADOS_API_HOST'),
332                 usr['email'])
333             self._add_mount(e, 'README', StringFile(e.inode, text, now))
334
335     def _add_mount(self, tld, name, ent):
336         if name in ['', '.', '..'] or '/' in name:
337             sys.exit("Mount point '{}' is not supported.".format(name))
338         tld._entries[name] = self.operations.inodes.add_entry(ent)
339         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
340
341     def _readme_text(self, api_host, user_email):
342         return '''
343 Welcome to Arvados!  This directory provides file system access to
344 files and objects available on the Arvados installation located at
345 '{}' using credentials for user '{}'.
346
347 From here, the following directories are available:
348
349   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
350   by_tag/    Access to Keep collections organized by tag.
351   home/      The contents of your home project.
352   shared/    Projects shared with you.
353
354 '''.format(api_host, user_email)
355
356     def _run_exec(self):
357         rc = 255
358         with self:
359             try:
360                 sp = subprocess.Popen(self.args.exec_args, shell=False)
361
362                 # forward signals to the process.
363                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
364                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
365                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
366
367                 # wait for process to complete.
368                 rc = sp.wait()
369
370                 # restore default signal handlers.
371                 signal.signal(signal.SIGINT, signal.SIG_DFL)
372                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
373                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
374             except Exception as e:
375                 self.logger.exception(
376                     'arv-mount: exception during exec %s', self.args.exec_args)
377                 try:
378                     rc = e.errno
379                 except AttributeError:
380                     pass
381         exit(rc)
382
383     def _run_standalone(self):
384         try:
385             self.daemon = not self.args.foreground
386             with self:
387                 self.llfuse_thread.join(timeout=None)
388         except Exception as e:
389             self.logger.exception('arv-mount: exception during mount: %s', e)
390             exit(getattr(e, 'errno', 1))
391         exit(0)
392
393     def _llfuse_main(self):
394         try:
395             llfuse.main()
396         except:
397             llfuse.close(unmount=False)
398             raise
399         llfuse.close()