Merge branch '21254-test-race'
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description='''Mount Keep data under the local filesystem.  Default mode is --home''',
32             epilog="""
33     Note: When using the --exec feature, you must either specify the
34     mountpoint before --exec, or mark the end of your --exec arguments
35     with "--".
36             """)
37         self.add_argument('--version', action='version',
38                           version=u"%s %s" % (sys.argv[0], __version__),
39                           help='Print version and exit.')
40         self.add_argument('mountpoint', type=str, help="""Mount point.""")
41         self.add_argument('--allow-other', action='store_true',
42                             help="""Let other users read the mount""")
43         self.add_argument('--subtype', type=str, metavar='STRING',
44                             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""")
45
46         mode = self.add_mutually_exclusive_group()
47
48         mode.add_argument('--all', action='store_const', const='all', dest='mode',
49                                 help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
50         mode.add_argument('--custom', action='store_const', const=None, dest='mode',
51                                 help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
52         mode.add_argument('--home', action='store_const', const='home', dest='mode',
53                                 help="""Mount only the user's home project.""")
54         mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
55                                 help="""Mount only list of projects shared with the user.""")
56         mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
57                                 help="""Mount subdirectories listed by tag.""")
58         mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
59                                 help="""Mount subdirectories listed by portable data hash or uuid.""")
60         mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
61                                 help="""Mount subdirectories listed by portable data hash.""")
62         mode.add_argument('--project', type=str, metavar='UUID',
63                                 help="""Mount the specified project.""")
64         mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
65                                 help="""Mount only the specified collection.""")
66
67         mounts = self.add_argument_group('Custom mount options')
68         mounts.add_argument('--mount-by-pdh',
69                             type=str, metavar='PATH', action='append', default=[],
70                             help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
71         mounts.add_argument('--mount-by-id',
72                             type=str, metavar='PATH', action='append', default=[],
73                             help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
74         mounts.add_argument('--mount-by-tag',
75                             type=str, metavar='PATH', action='append', default=[],
76                             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
77         mounts.add_argument('--mount-home',
78                             type=str, metavar='PATH', action='append', default=[],
79                             help="Mount the current user's home project at mountpoint/PATH.")
80         mounts.add_argument('--mount-shared',
81                             type=str, metavar='PATH', action='append', default=[],
82                             help="Mount projects shared with the current user at mountpoint/PATH.")
83         mounts.add_argument('--mount-tmp',
84                             type=str, metavar='PATH', action='append', default=[],
85                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
86
87
88         self.add_argument('--debug', action='store_true', help="""Debug mode""")
89         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
90         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
91         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
92
93         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 8 GiB for disk-based cache or 256 MiB with RAM-only cache)", default=0)
94         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128 MiB)", default=128*1024*1024)
95
96         cachetype = self.add_mutually_exclusive_group()
97         cachetype.add_argument('--ram-cache', action='store_false', dest='disk_cache', help="Use in-memory caching only", default=True)
98         cachetype.add_argument('--disk-cache', action='store_true', dest='disk_cache', help="Use disk based caching (default)", default=True)
99
100         self.add_argument('--disk-cache-dir', type=str, help="Disk cache location (default ~/.cache/arvados/keep)", default=None)
101
102         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
103
104         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
105         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
106         self.add_argument('--storage-classes', type=str, metavar='CLASSES', help="Specify comma separated list of storage classes to be used when saving data of new collections", default=None)
107
108         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
109
110         unmount = self.add_mutually_exclusive_group()
111         unmount.add_argument('--unmount', action='store_true', default=False,
112                              help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
113         unmount.add_argument('--unmount-all', action='store_true', default=False,
114                              help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
115         unmount.add_argument('--replace', action='store_true', default=False,
116                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
117         self.add_argument('--unmount-timeout',
118                           type=float, default=2.0,
119                           help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
120
121         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
122                             dest="exec_args", metavar=('command', 'args', '...', '--'),
123                             help="""Mount, run a command, then unmount and exit""")
124
125
126 class Mount(object):
127     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
128         self.daemon = False
129         self.logger = logger
130         self.args = args
131         self.listen_for_events = False
132
133         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
134         if self.args.logfile:
135             self.args.logfile = os.path.realpath(self.args.logfile)
136
137         try:
138             self._setup_logging()
139         except Exception as e:
140             self.logger.exception("exception during setup: %s", e)
141             exit(1)
142
143         try:
144             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
145
146             minlimit = 10240
147             if self.args.file_cache:
148                 # Adjust the file handle limit so it can meet
149                 # the desired cache size. Multiply by 8 because the
150                 # number of 64 MiB cache slots that keepclient
151                 # allocates is RLIMIT_NOFILE / 8
152                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
153
154             if nofile_limit[0] < minlimit:
155                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
156
157             if minlimit > nofile_limit[1]:
158                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
159
160         except Exception as e:
161             self.logger.warning("unable to adjust file handle limit: %s", e)
162
163         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
164         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
165
166         try:
167             self._setup_api()
168             self._setup_mount()
169         except Exception as e:
170             self.logger.exception("exception during setup: %s", e)
171             exit(1)
172
173     def __enter__(self):
174         if self.args.replace:
175             unmount(path=self.args.mountpoint,
176                     timeout=self.args.unmount_timeout)
177         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
178         if self.daemon:
179             daemon.DaemonContext(
180                 working_directory=os.path.dirname(self.args.mountpoint),
181                 files_preserve=list(range(
182                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
183             ).open()
184         if self.listen_for_events and not self.args.disable_event_listening:
185             self.operations.listen_for_events()
186         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
187         self.llfuse_thread.daemon = True
188         self.llfuse_thread.start()
189         self.operations.initlock.wait()
190         return self
191
192     def __exit__(self, exc_type, exc_value, traceback):
193         if self.operations.events:
194             self.operations.events.close(timeout=self.args.unmount_timeout)
195         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
196         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
197         if self.llfuse_thread.is_alive():
198             self.logger.warning("Mount.__exit__:"
199                                 " llfuse thread still alive %fs after umount"
200                                 " -- abandoning and exiting anyway",
201                                 self.args.unmount_timeout)
202
203     def run(self):
204         if self.args.unmount or self.args.unmount_all:
205             unmount(path=self.args.mountpoint,
206                     subtype=self.args.subtype,
207                     timeout=self.args.unmount_timeout,
208                     recursive=self.args.unmount_all)
209         elif self.args.exec_args:
210             self._run_exec()
211         else:
212             self._run_standalone()
213
214     def _fuse_options(self):
215         """FUSE mount options; see mount.fuse(8)"""
216         opts = [optname for optname in ['allow_other', 'debug']
217                 if getattr(self.args, optname)]
218         # Increase default read/write size from 4KiB to 128KiB
219         opts += ["big_writes", "max_read=131072"]
220         if self.args.subtype:
221             opts += ["subtype="+self.args.subtype]
222         return opts
223
224     def _setup_logging(self):
225         # Configure a log handler based on command-line switches.
226         if self.args.logfile:
227             log_handler = logging.FileHandler(self.args.logfile)
228             log_handler.setFormatter(logging.Formatter(
229                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
230                 '%Y-%m-%d %H:%M:%S'))
231         else:
232             log_handler = None
233
234         if log_handler is not None:
235             arvados.logger.removeHandler(arvados.log_handler)
236             arvados.logger.addHandler(log_handler)
237
238         if self.args.debug:
239             arvados.logger.setLevel(logging.DEBUG)
240             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
241             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
242             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
243             self.logger.debug("arv-mount debugging enabled")
244
245         self.logger.info("%s %s started", sys.argv[0], __version__)
246         self.logger.info("enable write is %s", self.args.enable_write)
247
248     def _setup_api(self):
249         try:
250             # default value of file_cache is 0, this tells KeepBlockCache to
251             # choose a default based on whether disk_cache is enabled or not.
252
253             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
254                                                       disk_cache=self.args.disk_cache,
255                                                       disk_cache_dir=self.args.disk_cache_dir)
256
257             # If there's too many prefetch threads and you
258             # max out the CPU, delivering data to the FUSE
259             # layer actually ends up being slower.
260             # Experimentally, capping 7 threads seems to
261             # be a sweet spot.
262             prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
263
264             self.api = arvados.safeapi.ThreadSafeApiCache(
265                 apiconfig=arvados.config.settings(),
266                 api_params={
267                     'num_retries': self.args.retries,
268                 },
269                 keep_params={
270                     'block_cache': block_cache,
271                     'num_prefetch_threads': prefetch_threads,
272                     'num_retries': self.args.retries,
273                 },
274                 version='v1',
275             )
276         except KeyError as e:
277             self.logger.error("Missing environment: %s", e)
278             exit(1)
279         # Do a sanity check that we have a working arvados host + token.
280         self.api.users().current().execute()
281
282     def _setup_mount(self):
283         self.operations = Operations(
284             os.getuid(),
285             os.getgid(),
286             api_client=self.api,
287             encoding=self.args.encoding,
288             inode_cache=InodeCache(cap=self.args.directory_cache),
289             enable_write=self.args.enable_write)
290
291         if self.args.crunchstat_interval:
292             statsthread = threading.Thread(
293                 target=crunchstat.statlogger,
294                 args=(self.args.crunchstat_interval,
295                       self.api.keep,
296                       self.operations))
297             statsthread.daemon = True
298             statsthread.start()
299
300         usr = self.api.users().current().execute(num_retries=self.args.retries)
301         now = time.time()
302         dir_class = None
303         dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries, self.args.enable_write]
304         mount_readme = False
305
306         storage_classes = None
307         if self.args.storage_classes is not None:
308             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
309             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
310
311         if self.args.collection is not None:
312             # Set up the request handler with the collection at the root
313             # First check that the collection is readable
314             self.api.collections().get(uuid=self.args.collection).execute()
315             self.args.mode = 'collection'
316             dir_class = CollectionDirectory
317             dir_args.append(self.args.collection)
318         elif self.args.project is not None:
319             self.args.mode = 'project'
320             dir_class = ProjectDirectory
321             dir_args.append(
322                 self.api.groups().get(uuid=self.args.project).execute(
323                     num_retries=self.args.retries))
324
325         if (self.args.mount_by_id or
326             self.args.mount_by_pdh or
327             self.args.mount_by_tag or
328             self.args.mount_home or
329             self.args.mount_shared or
330             self.args.mount_tmp):
331             if self.args.mode is not None:
332                 sys.exit(
333                     "Cannot combine '{}' mode with custom --mount-* options.".
334                     format(self.args.mode))
335         elif self.args.mode is None:
336             # If no --mount-custom or custom mount args, --all is the default
337             self.args.mode = 'all'
338
339         if self.args.mode in ['by_id', 'by_pdh']:
340             # Set up the request handler with the 'magic directory' at the root
341             dir_class = MagicDirectory
342             dir_args.append(self.args.mode == 'by_pdh')
343         elif self.args.mode == 'by_tag':
344             dir_class = TagsDirectory
345         elif self.args.mode == 'shared':
346             dir_class = SharedDirectory
347             dir_args.append(usr)
348         elif self.args.mode == 'home':
349             dir_class = ProjectDirectory
350             dir_args.append(usr)
351             dir_args.append(True)
352         elif self.args.mode == 'all':
353             self.args.mount_by_id = ['by_id']
354             self.args.mount_by_tag = ['by_tag']
355             self.args.mount_home = ['home']
356             self.args.mount_shared = ['shared']
357             mount_readme = True
358
359         if dir_class is not None:
360             if dir_class in [TagsDirectory, CollectionDirectory]:
361                 ent = dir_class(*dir_args)
362             else:
363                 ent = dir_class(*dir_args, storage_classes=storage_classes)
364             self.operations.inodes.add_entry(ent)
365             self.listen_for_events = ent.want_event_subscribe()
366             return
367
368         e = self.operations.inodes.add_entry(Directory(
369             llfuse.ROOT_INODE, self.operations.inodes, self.api.config, self.args.enable_write))
370         dir_args[0] = e.inode
371
372         for name in self.args.mount_by_id:
373             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
374         for name in self.args.mount_by_pdh:
375             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
376         for name in self.args.mount_by_tag:
377             self._add_mount(e, name, TagsDirectory(*dir_args))
378         for name in self.args.mount_home:
379             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
380         for name in self.args.mount_shared:
381             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
382         for name in self.args.mount_tmp:
383             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
384
385         if mount_readme:
386             text = self._readme_text(
387                 arvados.config.get('ARVADOS_API_HOST'),
388                 usr['email'])
389             self._add_mount(e, 'README', StringFile(e.inode, text, now))
390
391     def _add_mount(self, tld, name, ent):
392         if name in ['', '.', '..'] or '/' in name:
393             sys.exit("Mount point '{}' is not supported.".format(name))
394         tld._entries[name] = self.operations.inodes.add_entry(ent)
395         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
396
397     def _readme_text(self, api_host, user_email):
398         return '''
399 Welcome to Arvados!  This directory provides file system access to
400 files and objects available on the Arvados installation located at
401 '{}' using credentials for user '{}'.
402
403 From here, the following directories are available:
404
405   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
406   by_tag/    Access to Keep collections organized by tag.
407   home/      The contents of your home project.
408   shared/    Projects shared with you.
409
410 '''.format(api_host, user_email)
411
412     def _run_exec(self):
413         rc = 255
414         with self:
415             try:
416                 sp = subprocess.Popen(self.args.exec_args, shell=False)
417
418                 # forward signals to the process.
419                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
420                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
421                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
422
423                 # wait for process to complete.
424                 rc = sp.wait()
425
426                 # restore default signal handlers.
427                 signal.signal(signal.SIGINT, signal.SIG_DFL)
428                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
429                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
430             except Exception as e:
431                 self.logger.exception(
432                     'arv-mount: exception during exec %s', self.args.exec_args)
433                 try:
434                     rc = e.errno
435                 except AttributeError:
436                     pass
437         exit(rc)
438
439     def _run_standalone(self):
440         try:
441             self.daemon = not self.args.foreground
442             with self:
443                 self.llfuse_thread.join(timeout=None)
444         except Exception as e:
445             self.logger.exception('arv-mount: exception during mount: %s', e)
446             exit(getattr(e, 'errno', 1))
447         exit(0)
448
449     def _llfuse_main(self):
450         try:
451             llfuse.main()
452         except:
453             llfuse.close(unmount=False)
454             raise
455         llfuse.close()