Merge branch '20652-cwl-poll-select' refs #20652
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description='''Mount Keep data under the local filesystem.  Default mode is --home''',
32             epilog="""
33     Note: When using the --exec feature, you must either specify the
34     mountpoint before --exec, or mark the end of your --exec arguments
35     with "--".
36             """)
37         self.add_argument('--version', action='version',
38                           version=u"%s %s" % (sys.argv[0], __version__),
39                           help='Print version and exit.')
40         self.add_argument('mountpoint', type=str, help="""Mount point.""")
41         self.add_argument('--allow-other', action='store_true',
42                             help="""Let other users read the mount""")
43         self.add_argument('--subtype', type=str, metavar='STRING',
44                             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""")
45
46         mode = self.add_mutually_exclusive_group()
47
48         mode.add_argument('--all', action='store_const', const='all', dest='mode',
49                                 help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
50         mode.add_argument('--custom', action='store_const', const=None, dest='mode',
51                                 help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
52         mode.add_argument('--home', action='store_const', const='home', dest='mode',
53                                 help="""Mount only the user's home project.""")
54         mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
55                                 help="""Mount only list of projects shared with the user.""")
56         mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
57                                 help="""Mount subdirectories listed by tag.""")
58         mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
59                                 help="""Mount subdirectories listed by portable data hash or uuid.""")
60         mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
61                                 help="""Mount subdirectories listed by portable data hash.""")
62         mode.add_argument('--project', type=str, metavar='UUID',
63                                 help="""Mount the specified project.""")
64         mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
65                                 help="""Mount only the specified collection.""")
66
67         mounts = self.add_argument_group('Custom mount options')
68         mounts.add_argument('--mount-by-pdh',
69                             type=str, metavar='PATH', action='append', default=[],
70                             help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
71         mounts.add_argument('--mount-by-id',
72                             type=str, metavar='PATH', action='append', default=[],
73                             help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
74         mounts.add_argument('--mount-by-tag',
75                             type=str, metavar='PATH', action='append', default=[],
76                             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
77         mounts.add_argument('--mount-home',
78                             type=str, metavar='PATH', action='append', default=[],
79                             help="Mount the current user's home project at mountpoint/PATH.")
80         mounts.add_argument('--mount-shared',
81                             type=str, metavar='PATH', action='append', default=[],
82                             help="Mount projects shared with the current user at mountpoint/PATH.")
83         mounts.add_argument('--mount-tmp',
84                             type=str, metavar='PATH', action='append', default=[],
85                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
86
87
88         self.add_argument('--debug', action='store_true', help="""Debug mode""")
89         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
90         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
91         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
92
93         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
94         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
95
96         cachetype = self.add_mutually_exclusive_group()
97         cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
98         cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
99
100         self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
101
102         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
103
104         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
105         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
106         self.add_argument('--storage-classes', type=str, metavar='CLASSES', help="Specify comma separated list of storage classes to be used when saving data of new collections", default=None)
107
108         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
109
110         unmount = self.add_mutually_exclusive_group()
111         unmount.add_argument('--unmount', action='store_true', default=False,
112                              help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
113         unmount.add_argument('--unmount-all', action='store_true', default=False,
114                              help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
115         unmount.add_argument('--replace', action='store_true', default=False,
116                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
117         self.add_argument('--unmount-timeout',
118                           type=float, default=2.0,
119                           help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
120
121         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
122                             dest="exec_args", metavar=('command', 'args', '...', '--'),
123                             help="""Mount, run a command, then unmount and exit""")
124
125
126 class Mount(object):
127     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
128         self.daemon = False
129         self.logger = logger
130         self.args = args
131         self.listen_for_events = False
132
133         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
134         if self.args.logfile:
135             self.args.logfile = os.path.realpath(self.args.logfile)
136
137         try:
138             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
139             if nofile_limit[0] < 10240:
140                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(10240, nofile_limit[1]), nofile_limit[1]))
141         except Exception as e:
142             self.logger.warning("arv-mount: unable to adjust file handle limit: %s", e)
143
144         self.logger.debug("arv-mount: file handle limit is %s", resource.getrlimit(resource.RLIMIT_NOFILE))
145
146         try:
147             self._setup_logging()
148             self._setup_api()
149             self._setup_mount()
150         except Exception as e:
151             self.logger.exception("arv-mount: exception during setup: %s", e)
152             exit(1)
153
154     def __enter__(self):
155         if self.args.replace:
156             unmount(path=self.args.mountpoint,
157                     timeout=self.args.unmount_timeout)
158         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
159         if self.daemon:
160             daemon.DaemonContext(
161                 working_directory=os.path.dirname(self.args.mountpoint),
162                 files_preserve=list(range(
163                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
164             ).open()
165         if self.listen_for_events and not self.args.disable_event_listening:
166             self.operations.listen_for_events()
167         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
168         self.llfuse_thread.daemon = True
169         self.llfuse_thread.start()
170         self.operations.initlock.wait()
171         return self
172
173     def __exit__(self, exc_type, exc_value, traceback):
174         if self.operations.events:
175             self.operations.events.close(timeout=self.args.unmount_timeout)
176         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
177         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
178         if self.llfuse_thread.is_alive():
179             self.logger.warning("Mount.__exit__:"
180                                 " llfuse thread still alive %fs after umount"
181                                 " -- abandoning and exiting anyway",
182                                 self.args.unmount_timeout)
183
184     def run(self):
185         if self.args.unmount or self.args.unmount_all:
186             unmount(path=self.args.mountpoint,
187                     subtype=self.args.subtype,
188                     timeout=self.args.unmount_timeout,
189                     recursive=self.args.unmount_all)
190         elif self.args.exec_args:
191             self._run_exec()
192         else:
193             self._run_standalone()
194
195     def _fuse_options(self):
196         """FUSE mount options; see mount.fuse(8)"""
197         opts = [optname for optname in ['allow_other', 'debug']
198                 if getattr(self.args, optname)]
199         # Increase default read/write size from 4KiB to 128KiB
200         opts += ["big_writes", "max_read=131072"]
201         if self.args.subtype:
202             opts += ["subtype="+self.args.subtype]
203         return opts
204
205     def _setup_logging(self):
206         # Configure a log handler based on command-line switches.
207         if self.args.logfile:
208             log_handler = logging.FileHandler(self.args.logfile)
209             log_handler.setFormatter(logging.Formatter(
210                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
211                 '%Y-%m-%d %H:%M:%S'))
212         else:
213             log_handler = None
214
215         if log_handler is not None:
216             arvados.logger.removeHandler(arvados.log_handler)
217             arvados.logger.addHandler(log_handler)
218
219         if self.args.debug:
220             arvados.logger.setLevel(logging.DEBUG)
221             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
222             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
223             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
224             self.logger.debug("arv-mount debugging enabled")
225
226         self.logger.info("%s %s started", sys.argv[0], __version__)
227         self.logger.info("enable write is %s", self.args.enable_write)
228
229     def _setup_api(self):
230         try:
231             # default value of file_cache is 0, this tells KeepBlockCache to
232             # choose a default based on whether disk_cache is enabled or not.
233
234             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
235                                                       disk_cache=self.args.disk_cache,
236                                                       disk_cache_dir=self.args.disk_cache_dir)
237
238             # If there's too many prefetch threads and you
239             # max out the CPU, delivering data to the FUSE
240             # layer actually ends up being slower.
241             # Experimentally, capping 7 threads seems to
242             # be a sweet spot.
243             prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
244
245             self.api = arvados.safeapi.ThreadSafeApiCache(
246                 apiconfig=arvados.config.settings(),
247                 api_params={
248                     'num_retries': self.args.retries,
249                 },
250                 keep_params={
251                     'block_cache': block_cache,
252                     'num_prefetch_threads': prefetch_threads,
253                     'num_retries': self.args.retries,
254                 },
255                 version='v1',
256             )
257         except KeyError as e:
258             self.logger.error("Missing environment: %s", e)
259             exit(1)
260         # Do a sanity check that we have a working arvados host + token.
261         self.api.users().current().execute()
262
263     def _setup_mount(self):
264         self.operations = Operations(
265             os.getuid(),
266             os.getgid(),
267             api_client=self.api,
268             encoding=self.args.encoding,
269             inode_cache=InodeCache(cap=self.args.directory_cache),
270             enable_write=self.args.enable_write)
271
272         if self.args.crunchstat_interval:
273             statsthread = threading.Thread(
274                 target=crunchstat.statlogger,
275                 args=(self.args.crunchstat_interval,
276                       self.api.keep,
277                       self.operations))
278             statsthread.daemon = True
279             statsthread.start()
280
281         usr = self.api.users().current().execute(num_retries=self.args.retries)
282         now = time.time()
283         dir_class = None
284         dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries, self.args.enable_write]
285         mount_readme = False
286
287         storage_classes = None
288         if self.args.storage_classes is not None:
289             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
290             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
291
292         if self.args.collection is not None:
293             # Set up the request handler with the collection at the root
294             # First check that the collection is readable
295             self.api.collections().get(uuid=self.args.collection).execute()
296             self.args.mode = 'collection'
297             dir_class = CollectionDirectory
298             dir_args.append(self.args.collection)
299         elif self.args.project is not None:
300             self.args.mode = 'project'
301             dir_class = ProjectDirectory
302             dir_args.append(
303                 self.api.groups().get(uuid=self.args.project).execute(
304                     num_retries=self.args.retries))
305
306         if (self.args.mount_by_id or
307             self.args.mount_by_pdh or
308             self.args.mount_by_tag or
309             self.args.mount_home or
310             self.args.mount_shared or
311             self.args.mount_tmp):
312             if self.args.mode is not None:
313                 sys.exit(
314                     "Cannot combine '{}' mode with custom --mount-* options.".
315                     format(self.args.mode))
316         elif self.args.mode is None:
317             # If no --mount-custom or custom mount args, --all is the default
318             self.args.mode = 'all'
319
320         if self.args.mode in ['by_id', 'by_pdh']:
321             # Set up the request handler with the 'magic directory' at the root
322             dir_class = MagicDirectory
323             dir_args.append(self.args.mode == 'by_pdh')
324         elif self.args.mode == 'by_tag':
325             dir_class = TagsDirectory
326         elif self.args.mode == 'shared':
327             dir_class = SharedDirectory
328             dir_args.append(usr)
329         elif self.args.mode == 'home':
330             dir_class = ProjectDirectory
331             dir_args.append(usr)
332             dir_args.append(True)
333         elif self.args.mode == 'all':
334             self.args.mount_by_id = ['by_id']
335             self.args.mount_by_tag = ['by_tag']
336             self.args.mount_home = ['home']
337             self.args.mount_shared = ['shared']
338             mount_readme = True
339
340         if dir_class is not None:
341             if dir_class in [TagsDirectory, CollectionDirectory]:
342                 ent = dir_class(*dir_args)
343             else:
344                 ent = dir_class(*dir_args, storage_classes=storage_classes)
345             self.operations.inodes.add_entry(ent)
346             self.listen_for_events = ent.want_event_subscribe()
347             return
348
349         e = self.operations.inodes.add_entry(Directory(
350             llfuse.ROOT_INODE, self.operations.inodes, self.api.config, self.args.enable_write))
351         dir_args[0] = e.inode
352
353         for name in self.args.mount_by_id:
354             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
355         for name in self.args.mount_by_pdh:
356             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
357         for name in self.args.mount_by_tag:
358             self._add_mount(e, name, TagsDirectory(*dir_args))
359         for name in self.args.mount_home:
360             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
361         for name in self.args.mount_shared:
362             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
363         for name in self.args.mount_tmp:
364             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
365
366         if mount_readme:
367             text = self._readme_text(
368                 arvados.config.get('ARVADOS_API_HOST'),
369                 usr['email'])
370             self._add_mount(e, 'README', StringFile(e.inode, text, now))
371
372     def _add_mount(self, tld, name, ent):
373         if name in ['', '.', '..'] or '/' in name:
374             sys.exit("Mount point '{}' is not supported.".format(name))
375         tld._entries[name] = self.operations.inodes.add_entry(ent)
376         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
377
378     def _readme_text(self, api_host, user_email):
379         return '''
380 Welcome to Arvados!  This directory provides file system access to
381 files and objects available on the Arvados installation located at
382 '{}' using credentials for user '{}'.
383
384 From here, the following directories are available:
385
386   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
387   by_tag/    Access to Keep collections organized by tag.
388   home/      The contents of your home project.
389   shared/    Projects shared with you.
390
391 '''.format(api_host, user_email)
392
393     def _run_exec(self):
394         rc = 255
395         with self:
396             try:
397                 sp = subprocess.Popen(self.args.exec_args, shell=False)
398
399                 # forward signals to the process.
400                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
401                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
402                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
403
404                 # wait for process to complete.
405                 rc = sp.wait()
406
407                 # restore default signal handlers.
408                 signal.signal(signal.SIGINT, signal.SIG_DFL)
409                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
410                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
411             except Exception as e:
412                 self.logger.exception(
413                     'arv-mount: exception during exec %s', self.args.exec_args)
414                 try:
415                     rc = e.errno
416                 except AttributeError:
417                     pass
418         exit(rc)
419
420     def _run_standalone(self):
421         try:
422             self.daemon = not self.args.foreground
423             with self:
424                 self.llfuse_thread.join(timeout=None)
425         except Exception as e:
426             self.logger.exception('arv-mount: exception during mount: %s', e)
427             exit(getattr(e, 'errno', 1))
428         exit(0)
429
430     def _llfuse_main(self):
431         try:
432             llfuse.main()
433         except:
434             llfuse.close(unmount=False)
435             raise
436         llfuse.close()