21541: Manage memory related to project directories
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 # This is speculative, but we're having low frequency SSL crashes
22 # early in the execution and so I thought maybe importing the SSL module
23 # before we start any threads might be helpful.
24 # On the other hand, I've still had it crash on me.
25 import _ssl
26
27 import arvados.commands._util as arv_cmd
28 from arvados_fuse import crunchstat
29 from arvados_fuse import *
30 from arvados_fuse.unmount import unmount
31 from arvados_fuse._version import __version__
32
33 class ArgumentParser(argparse.ArgumentParser):
34     def __init__(self):
35         super(ArgumentParser, self).__init__(
36             parents=[arv_cmd.retry_opt],
37             description='''Mount Keep data under the local filesystem.  Default mode is --home''',
38             epilog="""
39     Note: When using the --exec feature, you must either specify the
40     mountpoint before --exec, or mark the end of your --exec arguments
41     with "--".
42             """)
43         self.add_argument('--version', action='version',
44                           version=u"%s %s" % (sys.argv[0], __version__),
45                           help='Print version and exit.')
46         self.add_argument('mountpoint', type=str, help="""Mount point.""")
47         self.add_argument('--allow-other', action='store_true',
48                             help="""Let other users read the mount""")
49         self.add_argument('--subtype', type=str, metavar='STRING',
50                             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""")
51
52         mode = self.add_mutually_exclusive_group()
53
54         mode.add_argument('--all', action='store_const', const='all', dest='mode',
55                                 help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
56         mode.add_argument('--custom', action='store_const', const=None, dest='mode',
57                                 help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
58         mode.add_argument('--home', action='store_const', const='home', dest='mode',
59                                 help="""Mount only the user's home project.""")
60         mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
61                                 help="""Mount only list of projects shared with the user.""")
62         mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
63                                 help="""Mount subdirectories listed by tag.""")
64         mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
65                                 help="""Mount subdirectories listed by portable data hash or uuid.""")
66         mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
67                                 help="""Mount subdirectories listed by portable data hash.""")
68         mode.add_argument('--project', type=str, metavar='UUID',
69                                 help="""Mount the specified project.""")
70         mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
71                                 help="""Mount only the specified collection.""")
72
73         mounts = self.add_argument_group('Custom mount options')
74         mounts.add_argument('--mount-by-pdh',
75                             type=str, metavar='PATH', action='append', default=[],
76                             help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
77         mounts.add_argument('--mount-by-id',
78                             type=str, metavar='PATH', action='append', default=[],
79                             help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
80         mounts.add_argument('--mount-by-tag',
81                             type=str, metavar='PATH', action='append', default=[],
82                             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
83         mounts.add_argument('--mount-home',
84                             type=str, metavar='PATH', action='append', default=[],
85                             help="Mount the current user's home project at mountpoint/PATH.")
86         mounts.add_argument('--mount-shared',
87                             type=str, metavar='PATH', action='append', default=[],
88                             help="Mount projects shared with the current user at mountpoint/PATH.")
89         mounts.add_argument('--mount-tmp',
90                             type=str, metavar='PATH', action='append', default=[],
91                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
92
93
94         self.add_argument('--debug', action='store_true', help="""Debug mode""")
95         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
96         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
97         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
98
99         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
100         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
101
102         cachetype = self.add_mutually_exclusive_group()
103         cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
104         cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
105
106         self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
107
108         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
109
110         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
111         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
112         self.add_argument('--storage-classes', type=str, metavar='CLASSES', help="Specify comma separated list of storage classes to be used when saving data of new collections", default=None)
113
114         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
115
116         unmount = self.add_mutually_exclusive_group()
117         unmount.add_argument('--unmount', action='store_true', default=False,
118                              help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
119         unmount.add_argument('--unmount-all', action='store_true', default=False,
120                              help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
121         unmount.add_argument('--replace', action='store_true', default=False,
122                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
123         self.add_argument('--unmount-timeout',
124                           type=float, default=2.0,
125                           help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
126         self.add_argument(
127             '--filters',
128             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
129             help="""Filters to apply to all project, shared, and tag directory
130 contents. Pass filters as either a JSON string or a path to a JSON file.
131 The JSON object should be a list of filters in Arvados API list filter syntax.
132 """)
133         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
134                             dest="exec_args", metavar=('command', 'args', '...', '--'),
135                             help="""Mount, run a command, then unmount and exit""")
136
137
138 class Mount(object):
139     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
140         self.daemon = False
141         self.logger = logger
142         self.args = args
143         self.listen_for_events = False
144
145         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
146         if self.args.logfile:
147             self.args.logfile = os.path.realpath(self.args.logfile)
148
149         try:
150             self._setup_logging()
151         except Exception as e:
152             self.logger.exception("exception during setup: %s", e)
153             exit(1)
154
155         try:
156             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
157
158             minlimit = 10240
159             if self.args.file_cache:
160                 # Adjust the file handle limit so it can meet
161                 # the desired cache size. Multiply by 8 because the
162                 # number of 64 MiB cache slots that keepclient
163                 # allocates is RLIMIT_NOFILE / 8
164                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
165
166             if nofile_limit[0] < minlimit:
167                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
168
169             if minlimit > nofile_limit[1]:
170                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
171
172         except Exception as e:
173             self.logger.warning("unable to adjust file handle limit: %s", e)
174
175         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
176         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
177
178         try:
179             self._setup_api()
180             self._setup_mount()
181         except Exception as e:
182             self.logger.exception("exception during setup: %s", e)
183             exit(1)
184
185     def __enter__(self):
186         if self.args.replace:
187             unmount(path=self.args.mountpoint,
188                     timeout=self.args.unmount_timeout)
189         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
190         if self.daemon:
191             daemon.DaemonContext(
192                 working_directory=os.path.dirname(self.args.mountpoint),
193                 files_preserve=list(range(
194                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
195             ).open()
196         if self.listen_for_events and not self.args.disable_event_listening:
197             self.operations.listen_for_events()
198         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
199         self.llfuse_thread.daemon = True
200         self.llfuse_thread.start()
201         self.operations.initlock.wait()
202         return self
203
204     def __exit__(self, exc_type, exc_value, traceback):
205         if self.operations.events:
206             self.operations.events.close(timeout=self.args.unmount_timeout)
207         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
208         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
209         if self.llfuse_thread.is_alive():
210             self.logger.warning("Mount.__exit__:"
211                                 " llfuse thread still alive %fs after umount"
212                                 " -- abandoning and exiting anyway",
213                                 self.args.unmount_timeout)
214
215     def run(self):
216         if self.args.unmount or self.args.unmount_all:
217             unmount(path=self.args.mountpoint,
218                     subtype=self.args.subtype,
219                     timeout=self.args.unmount_timeout,
220                     recursive=self.args.unmount_all)
221         elif self.args.exec_args:
222             self._run_exec()
223         else:
224             self._run_standalone()
225
226     def _fuse_options(self):
227         """FUSE mount options; see mount.fuse(8)"""
228         opts = [optname for optname in ['allow_other', 'debug']
229                 if getattr(self.args, optname)]
230         # Increase default read/write size from 4KiB to 128KiB
231         opts += ["big_writes", "max_read=131072"]
232         if self.args.subtype:
233             opts += ["subtype="+self.args.subtype]
234         return opts
235
236     def _setup_logging(self):
237         # Configure a log handler based on command-line switches.
238         if self.args.logfile:
239             log_handler = logging.FileHandler(self.args.logfile)
240             log_handler.setFormatter(logging.Formatter(
241                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
242                 '%Y-%m-%d %H:%M:%S'))
243         else:
244             log_handler = None
245
246         if log_handler is not None:
247             arvados.logger.removeHandler(arvados.log_handler)
248             arvados.logger.addHandler(log_handler)
249
250         if self.args.debug:
251             arvados.logger.setLevel(logging.DEBUG)
252             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
253             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
254             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
255             self.logger.debug("arv-mount debugging enabled")
256
257         self.logger.info("%s %s started", sys.argv[0], __version__)
258         self.logger.info("enable write is %s", self.args.enable_write)
259
260     def _setup_api(self):
261         try:
262             # default value of file_cache is 0, this tells KeepBlockCache to
263             # choose a default based on whether disk_cache is enabled or not.
264
265             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
266                                                       disk_cache=self.args.disk_cache,
267                                                       disk_cache_dir=self.args.disk_cache_dir)
268
269             # If there's too many prefetch threads and you
270             # max out the CPU, delivering data to the FUSE
271             # layer actually ends up being slower.
272             # Experimentally, capping 7 threads seems to
273             # be a sweet spot.
274             prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
275
276             self.api = arvados.safeapi.ThreadSafeApiCache(
277                 apiconfig=arvados.config.settings(),
278                 api_params={
279                     'num_retries': self.args.retries,
280                 },
281                 keep_params={
282                     'block_cache': block_cache,
283                     'num_prefetch_threads': prefetch_threads,
284                     'num_retries': self.args.retries,
285                 },
286                 version='v1',
287             )
288         except KeyError as e:
289             self.logger.error("Missing environment: %s", e)
290             exit(1)
291         # Do a sanity check that we have a working arvados host + token.
292         self.api.users().current().execute()
293
294     def _setup_mount(self):
295         self.operations = Operations(
296             os.getuid(),
297             os.getgid(),
298             api_client=self.api,
299             encoding=self.args.encoding,
300             inode_cache=InodeCache(cap=self.args.directory_cache),
301             enable_write=self.args.enable_write)
302
303         if self.args.crunchstat_interval:
304             statsthread = threading.Thread(
305                 target=crunchstat.statlogger,
306                 args=(self.args.crunchstat_interval,
307                       self.api.keep,
308                       self.operations))
309             statsthread.daemon = True
310             statsthread.start()
311
312         usr = self.api.users().current().execute(num_retries=self.args.retries)
313         now = time.time()
314         dir_class = None
315         dir_args = [
316             llfuse.ROOT_INODE,
317             self.operations.inodes,
318             self.api,
319             self.args.retries,
320             self.args.enable_write,
321             self.args.filters,
322         ]
323         mount_readme = False
324
325         storage_classes = None
326         if self.args.storage_classes is not None:
327             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
328             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
329
330         if self.args.collection is not None:
331             # Set up the request handler with the collection at the root
332             # First check that the collection is readable
333             self.api.collections().get(uuid=self.args.collection).execute()
334             self.args.mode = 'collection'
335             dir_class = CollectionDirectory
336             dir_args.append(self.args.collection)
337         elif self.args.project is not None:
338             self.args.mode = 'project'
339             dir_class = ProjectDirectory
340             dir_args.append(
341                 self.api.groups().get(uuid=self.args.project).execute(
342                     num_retries=self.args.retries))
343
344         if (self.args.mount_by_id or
345             self.args.mount_by_pdh or
346             self.args.mount_by_tag or
347             self.args.mount_home or
348             self.args.mount_shared or
349             self.args.mount_tmp):
350             if self.args.mode is not None:
351                 sys.exit(
352                     "Cannot combine '{}' mode with custom --mount-* options.".
353                     format(self.args.mode))
354         elif self.args.mode is None:
355             # If no --mount-custom or custom mount args, --all is the default
356             self.args.mode = 'all'
357
358         if self.args.mode in ['by_id', 'by_pdh']:
359             # Set up the request handler with the 'magic directory' at the root
360             dir_class = MagicDirectory
361             dir_args.append(self.args.mode == 'by_pdh')
362         elif self.args.mode == 'by_tag':
363             dir_class = TagsDirectory
364         elif self.args.mode == 'shared':
365             dir_class = SharedDirectory
366             dir_args.append(usr)
367         elif self.args.mode == 'home':
368             dir_class = ProjectDirectory
369             dir_args.append(usr)
370             dir_args.append(True)
371         elif self.args.mode == 'all':
372             self.args.mount_by_id = ['by_id']
373             self.args.mount_by_tag = ['by_tag']
374             self.args.mount_home = ['home']
375             self.args.mount_shared = ['shared']
376             mount_readme = True
377
378         if dir_class is not None:
379             if dir_class in [TagsDirectory, CollectionDirectory]:
380                 ent = dir_class(*dir_args)
381             else:
382                 ent = dir_class(*dir_args, storage_classes=storage_classes)
383             self.operations.inodes.add_entry(ent)
384             self.listen_for_events = ent.want_event_subscribe()
385             return
386
387         e = self.operations.inodes.add_entry(Directory(
388             llfuse.ROOT_INODE,
389             self.operations.inodes,
390             lambda: self.api.config(),
391             self.args.enable_write,
392             self.args.filters,
393         ))
394         dir_args[0] = e.inode
395
396         for name in self.args.mount_by_id:
397             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
398         for name in self.args.mount_by_pdh:
399             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
400         for name in self.args.mount_by_tag:
401             self._add_mount(e, name, TagsDirectory(*dir_args))
402         for name in self.args.mount_home:
403             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
404         for name in self.args.mount_shared:
405             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
406         for name in self.args.mount_tmp:
407             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
408
409         if mount_readme:
410             text = self._readme_text(
411                 arvados.config.get('ARVADOS_API_HOST'),
412                 usr['email'])
413             self._add_mount(e, 'README', StringFile(e.inode, text, now))
414
415     def _add_mount(self, tld, name, ent):
416         if name in ['', '.', '..'] or '/' in name:
417             sys.exit("Mount point '{}' is not supported.".format(name))
418         tld._entries[name] = self.operations.inodes.add_entry(ent)
419         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
420
421     def _readme_text(self, api_host, user_email):
422         return '''
423 Welcome to Arvados!  This directory provides file system access to
424 files and objects available on the Arvados installation located at
425 '{}' using credentials for user '{}'.
426
427 From here, the following directories are available:
428
429   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
430   by_tag/    Access to Keep collections organized by tag.
431   home/      The contents of your home project.
432   shared/    Projects shared with you.
433
434 '''.format(api_host, user_email)
435
436     def _run_exec(self):
437         rc = 255
438         with self:
439             try:
440                 sp = subprocess.Popen(self.args.exec_args, shell=False)
441
442                 # forward signals to the process.
443                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
444                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
445                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
446
447                 # wait for process to complete.
448                 rc = sp.wait()
449
450                 # restore default signal handlers.
451                 signal.signal(signal.SIGINT, signal.SIG_DFL)
452                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
453                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
454             except Exception as e:
455                 self.logger.exception(
456                     'arv-mount: exception during exec %s', self.args.exec_args)
457                 try:
458                     rc = e.errno
459                 except AttributeError:
460                     pass
461         exit(rc)
462
463     def _run_standalone(self):
464         try:
465             self.daemon = not self.args.foreground
466             with self:
467                 self.llfuse_thread.join(timeout=None)
468         except Exception as e:
469             self.logger.exception('arv-mount: exception during mount: %s', e)
470             exit(getattr(e, 'errno', 1))
471         exit(0)
472
473     def _llfuse_main(self):
474         try:
475             llfuse.main(workers=1)
476         except:
477             llfuse.close(unmount=False)
478             raise
479         llfuse.close()