14965: Fixes crunchstat test assertion
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19
20 import arvados.commands._util as arv_cmd
21 from arvados_fuse import crunchstat
22 from arvados_fuse import *
23 from arvados_fuse.unmount import unmount
24 from arvados_fuse._version import __version__
25
26 class ArgumentParser(argparse.ArgumentParser):
27     def __init__(self):
28         super(ArgumentParser, self).__init__(
29             parents=[arv_cmd.retry_opt],
30             description='''Mount Keep data under the local filesystem.  Default mode is --home''',
31             epilog="""
32     Note: When using the --exec feature, you must either specify the
33     mountpoint before --exec, or mark the end of your --exec arguments
34     with "--".
35             """)
36         self.add_argument('--version', action='version',
37                           version=u"%s %s" % (sys.argv[0], __version__),
38                           help='Print version and exit.')
39         self.add_argument('mountpoint', type=str, help="""Mount point.""")
40         self.add_argument('--allow-other', action='store_true',
41                             help="""Let other users read the mount""")
42         self.add_argument('--subtype', type=str, metavar='STRING',
43                             help="""Report mounted filesystem type as "fuse.STRING", instead of just "fuse".""")
44
45         mode = self.add_mutually_exclusive_group()
46
47         mode.add_argument('--all', action='store_const', const='all', dest='mode',
48                                 help="""Mount a subdirectory for each mode: home, shared, by_tag, by_id (default if no --mount-* arguments are given).""")
49         mode.add_argument('--custom', action='store_const', const=None, dest='mode',
50                                 help="""Mount a top level meta-directory with subdirectories as specified by additional --mount-* arguments (default if any --mount-* arguments are given).""")
51         mode.add_argument('--home', action='store_const', const='home', dest='mode',
52                                 help="""Mount only the user's home project.""")
53         mode.add_argument('--shared', action='store_const', const='shared', dest='mode',
54                                 help="""Mount only list of projects shared with the user.""")
55         mode.add_argument('--by-tag', action='store_const', const='by_tag', dest='mode',
56                                 help="""Mount subdirectories listed by tag.""")
57         mode.add_argument('--by-id', action='store_const', const='by_id', dest='mode',
58                                 help="""Mount subdirectories listed by portable data hash or uuid.""")
59         mode.add_argument('--by-pdh', action='store_const', const='by_pdh', dest='mode',
60                                 help="""Mount subdirectories listed by portable data hash.""")
61         mode.add_argument('--project', type=str, metavar='UUID',
62                                 help="""Mount the specified project.""")
63         mode.add_argument('--collection', type=str, metavar='UUID_or_PDH',
64                                 help="""Mount only the specified collection.""")
65
66         mounts = self.add_argument_group('Custom mount options')
67         mounts.add_argument('--mount-by-pdh',
68                             type=str, metavar='PATH', action='append', default=[],
69                             help="Mount each readable collection at mountpoint/PATH/P where P is the collection's portable data hash.")
70         mounts.add_argument('--mount-by-id',
71                             type=str, metavar='PATH', action='append', default=[],
72                             help="Mount each readable collection at mountpoint/PATH/UUID and mountpoint/PATH/PDH where PDH is the collection's portable data hash and UUID is its UUID.")
73         mounts.add_argument('--mount-by-tag',
74                             type=str, metavar='PATH', action='append', default=[],
75                             help="Mount all collections with tag TAG at mountpoint/PATH/TAG/UUID.")
76         mounts.add_argument('--mount-home',
77                             type=str, metavar='PATH', action='append', default=[],
78                             help="Mount the current user's home project at mountpoint/PATH.")
79         mounts.add_argument('--mount-shared',
80                             type=str, metavar='PATH', action='append', default=[],
81                             help="Mount projects shared with the current user at mountpoint/PATH.")
82         mounts.add_argument('--mount-tmp',
83                             type=str, metavar='PATH', action='append', default=[],
84                             help="Create a new collection, mount it in read/write mode at mountpoint/PATH, and delete it when unmounting.")
85
86         self.add_argument('--debug', action='store_true', help="""Debug mode""")
87         self.add_argument('--logfile', help="""Write debug logs and errors to the specified file (default stderr).""")
88         self.add_argument('--foreground', action='store_true', help="""Run in foreground (default is to daemonize unless --exec specified)""", default=False)
89         self.add_argument('--encoding', type=str, help="Character encoding to use for filesystem, default is utf-8 (see Python codec registry for list of available encodings)", default="utf-8")
90
91         self.add_argument('--file-cache', type=int, help="File data cache size, in bytes (default 256MiB)", default=256*1024*1024)
92         self.add_argument('--directory-cache', type=int, help="Directory data cache size, in bytes (default 128MiB)", default=128*1024*1024)
93
94         self.add_argument('--disable-event-listening', action='store_true', help="Don't subscribe to events on the API server", dest="disable_event_listening", default=False)
95
96         self.add_argument('--read-only', action='store_false', help="Mount will be read only (default)", dest="enable_write", default=False)
97         self.add_argument('--read-write', action='store_true', help="Mount will be read-write", dest="enable_write", default=False)
98
99         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
100
101         unmount = self.add_mutually_exclusive_group()
102         unmount.add_argument('--unmount', action='store_true', default=False,
103                              help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
104         unmount.add_argument('--unmount-all', action='store_true', default=False,
105                              help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
106         unmount.add_argument('--replace', action='store_true', default=False,
107                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
108         self.add_argument('--unmount-timeout',
109                           type=float, default=2.0,
110                           help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
111
112         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
113                             dest="exec_args", metavar=('command', 'args', '...', '--'),
114                             help="""Mount, run a command, then unmount and exit""")
115
116
117 class Mount(object):
118     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
119         self.daemon = False
120         self.logger = logger
121         self.args = args
122         self.listen_for_events = False
123
124         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
125         if self.args.logfile:
126             self.args.logfile = os.path.realpath(self.args.logfile)
127
128         try:
129             self._setup_logging()
130             self._setup_api()
131             self._setup_mount()
132         except Exception as e:
133             self.logger.exception("arv-mount: exception during setup: %s", e)
134             exit(1)
135
136     def __enter__(self):
137         if self.args.replace:
138             unmount(path=self.args.mountpoint,
139                     timeout=self.args.unmount_timeout)
140         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
141         if self.daemon:
142             daemon.DaemonContext(
143                 working_directory=os.path.dirname(self.args.mountpoint),
144                 files_preserve=list(range(
145                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
146             ).open()
147         if self.listen_for_events and not self.args.disable_event_listening:
148             self.operations.listen_for_events()
149         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
150         self.llfuse_thread.daemon = True
151         self.llfuse_thread.start()
152         self.operations.initlock.wait()
153         return self
154
155     def __exit__(self, exc_type, exc_value, traceback):
156         if self.operations.events:
157             self.operations.events.close(timeout=self.args.unmount_timeout)
158         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
159         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
160         if self.llfuse_thread.is_alive():
161             self.logger.warning("Mount.__exit__:"
162                                 " llfuse thread still alive %fs after umount"
163                                 " -- abandoning and exiting anyway",
164                                 self.args.unmount_timeout)
165
166     def run(self):
167         if self.args.unmount or self.args.unmount_all:
168             unmount(path=self.args.mountpoint,
169                     subtype=self.args.subtype,
170                     timeout=self.args.unmount_timeout,
171                     recursive=self.args.unmount_all)
172         elif self.args.exec_args:
173             self._run_exec()
174         else:
175             self._run_standalone()
176
177     def _fuse_options(self):
178         """FUSE mount options; see mount.fuse(8)"""
179         opts = [optname for optname in ['allow_other', 'debug']
180                 if getattr(self.args, optname)]
181         # Increase default read/write size from 4KiB to 128KiB
182         opts += ["big_writes", "max_read=131072"]
183         if self.args.subtype:
184             opts += ["subtype="+self.args.subtype]
185         return opts
186
187     def _setup_logging(self):
188         # Configure a log handler based on command-line switches.
189         if self.args.logfile:
190             log_handler = logging.FileHandler(self.args.logfile)
191             log_handler.setFormatter(logging.Formatter(
192                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
193                 '%Y-%m-%d %H:%M:%S'))
194         else:
195             log_handler = None
196
197         if log_handler is not None:
198             arvados.logger.removeHandler(arvados.log_handler)
199             arvados.logger.addHandler(log_handler)
200
201         if self.args.debug:
202             arvados.logger.setLevel(logging.DEBUG)
203             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
204             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
205             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
206             self.logger.debug("arv-mount debugging enabled")
207
208         self.logger.info("%s %s started", sys.argv[0], __version__)
209         self.logger.info("enable write is %s", self.args.enable_write)
210
211     def _setup_api(self):
212         try:
213             self.api = arvados.safeapi.ThreadSafeApiCache(
214                 apiconfig=arvados.config.settings(),
215                 keep_params={
216                     'block_cache': arvados.keep.KeepBlockCache(self.args.file_cache),
217                     'num_retries': self.args.retries,
218                 })
219         except KeyError as e:
220             self.logger.error("Missing environment: %s", e)
221             exit(1)
222         # Do a sanity check that we have a working arvados host + token.
223         self.api.users().current().execute()
224
225     def _setup_mount(self):
226         self.operations = Operations(
227             os.getuid(),
228             os.getgid(),
229             api_client=self.api,
230             encoding=self.args.encoding,
231             inode_cache=InodeCache(cap=self.args.directory_cache),
232             enable_write=self.args.enable_write)
233
234         if self.args.crunchstat_interval:
235             statsthread = threading.Thread(
236                 target=crunchstat.statlogger,
237                 args=(self.args.crunchstat_interval,
238                       self.api.keep,
239                       self.operations))
240             statsthread.daemon = True
241             statsthread.start()
242
243         usr = self.api.users().current().execute(num_retries=self.args.retries)
244         now = time.time()
245         dir_class = None
246         dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries]
247         mount_readme = False
248
249         if self.args.collection is not None:
250             # Set up the request handler with the collection at the root
251             # First check that the collection is readable
252             self.api.collections().get(uuid=self.args.collection).execute()
253             self.args.mode = 'collection'
254             dir_class = CollectionDirectory
255             dir_args.append(self.args.collection)
256         elif self.args.project is not None:
257             self.args.mode = 'project'
258             dir_class = ProjectDirectory
259             dir_args.append(
260                 self.api.groups().get(uuid=self.args.project).execute(
261                     num_retries=self.args.retries))
262
263         if (self.args.mount_by_id or
264             self.args.mount_by_pdh or
265             self.args.mount_by_tag or
266             self.args.mount_home or
267             self.args.mount_shared or
268             self.args.mount_tmp):
269             if self.args.mode is not None:
270                 sys.exit(
271                     "Cannot combine '{}' mode with custom --mount-* options.".
272                     format(self.args.mode))
273         elif self.args.mode is None:
274             # If no --mount-custom or custom mount args, --all is the default
275             self.args.mode = 'all'
276
277         if self.args.mode in ['by_id', 'by_pdh']:
278             # Set up the request handler with the 'magic directory' at the root
279             dir_class = MagicDirectory
280             dir_args.append(self.args.mode == 'by_pdh')
281         elif self.args.mode == 'by_tag':
282             dir_class = TagsDirectory
283         elif self.args.mode == 'shared':
284             dir_class = SharedDirectory
285             dir_args.append(usr)
286         elif self.args.mode == 'home':
287             dir_class = ProjectDirectory
288             dir_args.append(usr)
289             dir_args.append(True)
290         elif self.args.mode == 'all':
291             self.args.mount_by_id = ['by_id']
292             self.args.mount_by_tag = ['by_tag']
293             self.args.mount_home = ['home']
294             self.args.mount_shared = ['shared']
295             mount_readme = True
296
297         if dir_class is not None:
298             ent = dir_class(*dir_args)
299             self.operations.inodes.add_entry(ent)
300             self.listen_for_events = ent.want_event_subscribe()
301             return
302
303         e = self.operations.inodes.add_entry(Directory(
304             llfuse.ROOT_INODE, self.operations.inodes))
305         dir_args[0] = e.inode
306
307         for name in self.args.mount_by_id:
308             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False))
309         for name in self.args.mount_by_pdh:
310             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
311         for name in self.args.mount_by_tag:
312             self._add_mount(e, name, TagsDirectory(*dir_args))
313         for name in self.args.mount_home:
314             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True))
315         for name in self.args.mount_shared:
316             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True))
317         for name in self.args.mount_tmp:
318             self._add_mount(e, name, TmpCollectionDirectory(*dir_args))
319
320         if mount_readme:
321             text = self._readme_text(
322                 arvados.config.get('ARVADOS_API_HOST'),
323                 usr['email'])
324             self._add_mount(e, 'README', StringFile(e.inode, text, now))
325
326     def _add_mount(self, tld, name, ent):
327         if name in ['', '.', '..'] or '/' in name:
328             sys.exit("Mount point '{}' is not supported.".format(name))
329         tld._entries[name] = self.operations.inodes.add_entry(ent)
330         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
331
332     def _readme_text(self, api_host, user_email):
333         return '''
334 Welcome to Arvados!  This directory provides file system access to
335 files and objects available on the Arvados installation located at
336 '{}' using credentials for user '{}'.
337
338 From here, the following directories are available:
339
340   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
341   by_tag/    Access to Keep collections organized by tag.
342   home/      The contents of your home project.
343   shared/    Projects shared with you.
344
345 '''.format(api_host, user_email)
346
347     def _run_exec(self):
348         rc = 255
349         with self:
350             try:
351                 sp = subprocess.Popen(self.args.exec_args, shell=False)
352
353                 # forward signals to the process.
354                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
355                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
356                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
357
358                 # wait for process to complete.
359                 rc = sp.wait()
360
361                 # restore default signal handlers.
362                 signal.signal(signal.SIGINT, signal.SIG_DFL)
363                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
364                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
365             except Exception as e:
366                 self.logger.exception(
367                     'arv-mount: exception during exec %s', self.args.exec_args)
368                 try:
369                     rc = e.errno
370                 except AttributeError:
371                     pass
372         exit(rc)
373
374     def _run_standalone(self):
375         try:
376             self.daemon = not self.args.foreground
377             with self:
378                 self.llfuse_thread.join(timeout=None)
379         except Exception as e:
380             self.logger.exception('arv-mount: exception during mount: %s', e)
381             exit(getattr(e, 'errno', 1))
382         exit(0)
383
384     def _llfuse_main(self):
385         try:
386             llfuse.main()
387         except:
388             llfuse.close(unmount=False)
389             raise
390         llfuse.close()