Merge branch '21855-rpm-url-update'
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import argparse
6 import arvados
7 import daemon
8 import llfuse
9 import logging
10 import os
11 import resource
12 import signal
13 import subprocess
14 import sys
15 import time
16 import resource
17
18 import arvados.commands._util as arv_cmd
19 from arvados_fuse import crunchstat
20 from arvados_fuse import *
21 from arvados_fuse.unmount import unmount
22 from arvados_fuse._version import __version__
23
24 class ArgumentParser(argparse.ArgumentParser):
25     def __init__(self):
26         super(ArgumentParser, self).__init__(
27             parents=[arv_cmd.retry_opt],
28             description="Interact with Arvados data through a local filesystem",
29         )
30         self.add_argument(
31             '--version',
32             action='version',
33             version=u"%s %s" % (sys.argv[0], __version__),
34             help="Print version and exit",
35         )
36         self.add_argument(
37             'mountpoint',
38             metavar='MOUNT_DIR',
39             help="Directory path to mount data",
40         )
41
42         mode_group = self.add_argument_group("Mount contents")
43         mode = mode_group.add_mutually_exclusive_group()
44         mode.add_argument(
45             '--all',
46             action='store_const',
47             const='all',
48             dest='mode',
49             help="""
50 Mount a subdirectory for each mode: `home`, `shared`, `by_id`, and `by_tag`
51 (default if no `--mount-*` options are given)
52 """,
53         )
54         mode.add_argument(
55             '--custom',
56             action='store_const',
57             const=None,
58             dest='mode',
59             help="""
60 Mount a subdirectory for each mode specified by a `--mount-*` option
61 (default if any `--mount-*` options are given;
62 see "Mount custom layout and filtering" section)
63 """,
64         )
65         mode.add_argument(
66             '--collection',
67             metavar='UUID_OR_PDH',
68             help="Mount the specified collection",
69         )
70         mode.add_argument(
71             '--home',
72             action='store_const',
73             const='home',
74             dest='mode',
75             help="Mount your home project",
76         )
77         mode.add_argument(
78             '--project',
79             metavar='UUID',
80             help="Mount the specified project",
81         )
82         mode.add_argument(
83             '--shared',
84             action='store_const',
85             const='shared',
86             dest='mode',
87             help="Mount a subdirectory for each project shared with you",
88         )
89         mode.add_argument(
90             '--by-id',
91             action='store_const',
92             const='by_id',
93             dest='mode',
94             help="""
95 Mount a magic directory where collections and projects are accessible through
96 subdirectories named after their UUID or portable data hash
97 """,
98         )
99         mode.add_argument(
100             '--by-pdh',
101             action='store_const',
102             const='by_pdh',
103             dest='mode',
104             help="""
105 Mount a magic directory where collections are accessible through
106 subdirectories named after their portable data hash
107 """,
108         )
109         mode.add_argument(
110             '--by-tag',
111             action='store_const',
112             const='by_tag',
113             dest='mode',
114             help="Mount a subdirectory for each tag attached to a collection or project",
115         )
116
117         mounts = self.add_argument_group("Mount custom layout and filtering")
118         mounts.add_argument(
119             '--filters',
120             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
121             help="""
122 Filters to apply to all project, shared, and tag directory contents.
123 Pass filters as either a JSON string or a path to a JSON file.
124 The JSON object should be a list of filters in Arvados API list filter syntax.
125 """,
126         )
127         mounts.add_argument(
128             '--mount-home',
129             metavar='PATH',
130             action='append',
131             default=[],
132             help="Make your home project available under the mount at `PATH`",
133         )
134         mounts.add_argument(
135             '--mount-shared',
136             metavar='PATH',
137             action='append',
138             default=[],
139             help="Make projects shared with you available under the mount at `PATH`",
140         )
141         mounts.add_argument(
142             '--mount-tmp',
143             metavar='PATH',
144             action='append',
145             default=[],
146             help="""
147 Make a new temporary writable collection available under the mount at `PATH`.
148 This collection is deleted when the mount is unmounted.
149 """,
150         )
151         mounts.add_argument(
152             '--mount-by-id',
153             metavar='PATH',
154             action='append',
155             default=[],
156             help="""
157 Make a magic directory available under the mount at `PATH` where collections and
158 projects are accessible through subdirectories named after their UUID or
159 portable data hash
160 """,
161         )
162         mounts.add_argument(
163             '--mount-by-pdh',
164             metavar='PATH',
165             action='append',
166             default=[],
167             help="""
168 Make a magic directory available under the mount at `PATH` where collections
169 are accessible through subdirectories named after portable data hash
170 """,
171         )
172         mounts.add_argument(
173             '--mount-by-tag',
174             metavar='PATH',
175             action='append',
176             default=[],
177             help="""
178 Make a subdirectory for each tag attached to a collection or project available
179 under the mount at `PATH`
180 """ ,
181         )
182
183         perms = self.add_argument_group("Mount access and permissions")
184         perms.add_argument(
185             '--allow-other',
186             action='store_true',
187             help="Let other users on this system read mounted data (default false)",
188         )
189         perms.add_argument(
190             '--read-only',
191             action='store_false',
192             default=False,
193             dest='enable_write',
194             help="Mounted data cannot be modified from the mount (default)",
195         )
196         perms.add_argument(
197             '--read-write',
198             action='store_true',
199             default=False,
200             dest='enable_write',
201             help="Mounted data can be modified from the mount",
202         )
203
204         lifecycle = self.add_argument_group("Mount lifecycle management")
205         lifecycle.add_argument(
206             '--exec',
207             nargs=argparse.REMAINDER,
208             dest="exec_args",
209             help="""
210 Mount data, run the specified command, then unmount and exit.
211 `--exec` reads all remaining options as the command to run,
212 so it must be the last option you specify.
213 Either end your command arguments (and other options) with a `--` argument,
214 or specify `--exec` after your mount point.
215 """,
216         )
217         lifecycle.add_argument(
218             '--foreground',
219             action='store_true',
220             default=False,
221             help="Run mount process in the foreground instead of daemonizing (default false)",
222         )
223         lifecycle.add_argument(
224             '--subtype',
225             help="Set mounted filesystem type to `fuse.SUBTYPE` (default is just `fuse`)",
226         )
227         unmount = lifecycle.add_mutually_exclusive_group()
228         unmount.add_argument(
229             '--replace',
230             action='store_true',
231             default=False,
232             help="""
233 If a FUSE mount is already mounted at the given directory,
234 unmount it before mounting the requested data.
235 If `--subtype` is specified, unmount only if the mount has that subtype.
236 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
237 """,
238         )
239         unmount.add_argument(
240             '--unmount',
241             action='store_true',
242             default=False,
243             help="""
244 If a FUSE mount is already mounted at the given directory, unmount it and exit.
245 If `--subtype` is specified, unmount only if the mount has that subtype.
246 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
247 """,
248         )
249         unmount.add_argument(
250             '--unmount-all',
251             action='store_true',
252             default=False,
253             help="""
254 Unmount all FUSE mounts at or below the given directory, then exit.
255 If `--subtype` is specified, unmount only if the mount has that subtype.
256 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
257 """,
258         )
259         lifecycle.add_argument(
260             '--unmount-timeout',
261             type=float,
262             default=2.0,
263             metavar='SECONDS',
264             help="""
265 The number of seconds to wait for a clean unmount after an `--exec` command has
266 exited (default %(default).01f).
267 After this time, the mount will be forcefully unmounted.
268 """,
269         )
270
271         reporting = self.add_argument_group("Mount logging and statistics")
272         reporting.add_argument(
273             '--crunchstat-interval',
274             type=float,
275             default=0.0,
276             metavar='SECONDS',
277             help="Write stats to stderr every N seconds (default disabled)",
278         )
279         reporting.add_argument(
280             '--debug',
281             action='store_true',
282             help="Log debug information",
283         )
284         reporting.add_argument(
285             '--logfile',
286             help="Write debug logs and errors to the specified file (default stderr)",
287         )
288
289         cache = self.add_argument_group("Mount local cache setup")
290         cachetype = cache.add_mutually_exclusive_group()
291         cachetype.add_argument(
292             '--disk-cache',
293             action='store_true',
294             default=True,
295             dest='disk_cache',
296             help="Cache data on the local filesystem (default)",
297         )
298         cachetype.add_argument(
299             '--ram-cache',
300             action='store_false',
301             default=True,
302             dest='disk_cache',
303             help="Cache data in memory",
304         )
305         cache.add_argument(
306             '--disk-cache-dir',
307             metavar="DIRECTORY",
308             help="Set custom filesystem cache location",
309         )
310         cache.add_argument(
311             '--directory-cache',
312             type=int,
313             default=128*1024*1024,
314             metavar='BYTES',
315             help="Size of directory data cache in bytes (default 128 MiB)",
316         )
317         cache.add_argument(
318             '--file-cache',
319             type=int,
320             default=0,
321             metavar='BYTES',
322             help="""
323 Size of file data cache in bytes
324 (default 8 GiB for filesystem cache, 256 MiB for memory cache)
325 """,
326         )
327
328         plumbing = self.add_argument_group("Mount interactions with Arvados and Linux")
329         plumbing.add_argument(
330             '--disable-event-listening',
331             action='store_true',
332             dest='disable_event_listening',
333             default=False,
334             help="Don't subscribe to events on the API server to update mount contents",
335         )
336         plumbing.add_argument(
337             '--encoding',
338             default="utf-8",
339             help="""
340 Filesystem character encoding
341 (default %(default)r; specify a name from the Python codec registry)
342 """,
343         )
344         plumbing.add_argument(
345             '--storage-classes',
346             metavar='CLASSES',
347             help="Comma-separated list of storage classes to request for new collections",
348         )
349         # This is a hidden argument used by tests.  Normally this
350         # value will be extracted from the cluster config, but mocking
351         # the cluster config under the presence of multiple threads
352         # and processes turned out to be too complicated and brittle.
353         plumbing.add_argument(
354             '--fsns',
355             type=str,
356             default=None,
357             help=argparse.SUPPRESS)
358
359 class Mount(object):
360     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
361         self.daemon = False
362         self.logger = logger
363         self.args = args
364         self.listen_for_events = False
365
366         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
367         if self.args.logfile:
368             self.args.logfile = os.path.realpath(self.args.logfile)
369
370         try:
371             self._setup_logging()
372         except Exception as e:
373             self.logger.exception("exception during setup: %s", e)
374             exit(1)
375
376         try:
377             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
378
379             minlimit = 10240
380             if self.args.file_cache:
381                 # Adjust the file handle limit so it can meet
382                 # the desired cache size. Multiply by 8 because the
383                 # number of 64 MiB cache slots that keepclient
384                 # allocates is RLIMIT_NOFILE / 8
385                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
386
387             if nofile_limit[0] < minlimit:
388                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
389
390             if minlimit > nofile_limit[1]:
391                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
392
393         except Exception as e:
394             self.logger.warning("unable to adjust file handle limit: %s", e)
395
396         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
397         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
398
399         try:
400             self._setup_api()
401             self._setup_mount()
402         except Exception as e:
403             self.logger.exception("exception during setup: %s", e)
404             exit(1)
405
406     def __enter__(self):
407         if self.args.replace:
408             unmount(path=self.args.mountpoint,
409                     timeout=self.args.unmount_timeout)
410         llfuse.init(self.operations, str(self.args.mountpoint), self._fuse_options())
411         if self.daemon:
412             daemon.DaemonContext(
413                 working_directory=os.path.dirname(self.args.mountpoint),
414                 files_preserve=list(range(
415                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
416             ).open()
417         if self.listen_for_events and not self.args.disable_event_listening:
418             self.operations.listen_for_events()
419         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
420         self.llfuse_thread.daemon = True
421         self.llfuse_thread.start()
422         self.operations.initlock.wait()
423         return self
424
425     def __exit__(self, exc_type, exc_value, traceback):
426         if self.operations.events:
427             self.operations.events.close(timeout=self.args.unmount_timeout)
428         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
429         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
430         if self.llfuse_thread.is_alive():
431             self.logger.warning("Mount.__exit__:"
432                                 " llfuse thread still alive %fs after umount"
433                                 " -- abandoning and exiting anyway",
434                                 self.args.unmount_timeout)
435
436     def run(self):
437         if self.args.unmount or self.args.unmount_all:
438             unmount(path=self.args.mountpoint,
439                     subtype=self.args.subtype,
440                     timeout=self.args.unmount_timeout,
441                     recursive=self.args.unmount_all)
442         elif self.args.exec_args:
443             self._run_exec()
444         else:
445             self._run_standalone()
446
447     def _fuse_options(self):
448         """FUSE mount options; see mount.fuse(8)"""
449         opts = [optname for optname in ['allow_other', 'debug']
450                 if getattr(self.args, optname)]
451         # Increase default read/write size from 4KiB to 128KiB
452         opts += ["big_writes", "max_read=131072"]
453         if self.args.subtype:
454             opts += ["subtype="+self.args.subtype]
455         return opts
456
457     def _setup_logging(self):
458         # Configure a log handler based on command-line switches.
459         if self.args.logfile:
460             log_handler = logging.FileHandler(self.args.logfile)
461             log_handler.setFormatter(logging.Formatter(
462                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
463                 '%Y-%m-%d %H:%M:%S'))
464         else:
465             log_handler = None
466
467         if log_handler is not None:
468             arvados.logger.removeHandler(arvados.log_handler)
469             arvados.logger.addHandler(log_handler)
470
471         if self.args.debug:
472             arvados.logger.setLevel(logging.DEBUG)
473             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
474             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
475             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
476             self.logger.debug("arv-mount debugging enabled")
477
478         self.logger.info("%s %s started", sys.argv[0], __version__)
479         self.logger.info("enable write is %s", self.args.enable_write)
480
481     def _setup_api(self):
482         try:
483             # default value of file_cache is 0, this tells KeepBlockCache to
484             # choose a default based on whether disk_cache is enabled or not.
485
486             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
487                                                       disk_cache=self.args.disk_cache,
488                                                       disk_cache_dir=self.args.disk_cache_dir)
489
490             self.api = arvados.safeapi.ThreadSafeApiCache(
491                 apiconfig=arvados.config.settings(),
492                 api_params={
493                     'num_retries': self.args.retries,
494                 },
495                 keep_params={
496                     'block_cache': block_cache,
497                     'num_retries': self.args.retries,
498                 },
499                 version='v1',
500             )
501         except KeyError as e:
502             self.logger.error("Missing environment: %s", e)
503             exit(1)
504         # Do a sanity check that we have a working arvados host + token.
505         self.api.users().current().execute()
506
507     def _setup_mount(self):
508         self.operations = Operations(
509             os.getuid(),
510             os.getgid(),
511             api_client=self.api,
512             encoding=self.args.encoding,
513             inode_cache=InodeCache(cap=self.args.directory_cache),
514             enable_write=self.args.enable_write,
515             fsns=self.args.fsns)
516
517         if self.args.crunchstat_interval:
518             statsthread = threading.Thread(
519                 target=crunchstat.statlogger,
520                 args=(self.args.crunchstat_interval,
521                       self.api.keep,
522                       self.operations))
523             statsthread.daemon = True
524             statsthread.start()
525
526         usr = self.api.users().current().execute(num_retries=self.args.retries)
527         now = time.time()
528         dir_class = None
529         dir_args = [
530             llfuse.ROOT_INODE,
531             self.operations.inodes,
532             self.api,
533             self.args.retries,
534             self.args.enable_write,
535             self.args.filters,
536         ]
537         mount_readme = False
538
539         storage_classes = None
540         if self.args.storage_classes is not None:
541             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
542             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
543
544         if self.args.collection is not None:
545             # Set up the request handler with the collection at the root
546             # First check that the collection is readable
547             self.api.collections().get(uuid=self.args.collection).execute()
548             self.args.mode = 'collection'
549             dir_class = CollectionDirectory
550             dir_args.append(self.args.collection)
551         elif self.args.project is not None:
552             self.args.mode = 'project'
553             dir_class = ProjectDirectory
554             dir_args.append(
555                 self.api.groups().get(uuid=self.args.project).execute(
556                     num_retries=self.args.retries))
557
558         if (self.args.mount_by_id or
559             self.args.mount_by_pdh or
560             self.args.mount_by_tag or
561             self.args.mount_home or
562             self.args.mount_shared or
563             self.args.mount_tmp):
564             if self.args.mode is not None:
565                 sys.exit(
566                     "Cannot combine '{}' mode with custom --mount-* options.".
567                     format(self.args.mode))
568         elif self.args.mode is None:
569             # If no --mount-custom or custom mount args, --all is the default
570             self.args.mode = 'all'
571
572         if self.args.mode in ['by_id', 'by_pdh']:
573             # Set up the request handler with the 'magic directory' at the root
574             dir_class = MagicDirectory
575             dir_args.append(self.args.mode == 'by_pdh')
576         elif self.args.mode == 'by_tag':
577             dir_class = TagsDirectory
578         elif self.args.mode == 'shared':
579             dir_class = SharedDirectory
580             dir_args.append(usr)
581         elif self.args.mode == 'home':
582             dir_class = ProjectDirectory
583             dir_args.append(usr)
584             dir_args.append(True)
585         elif self.args.mode == 'all':
586             self.args.mount_by_id = ['by_id']
587             self.args.mount_by_tag = ['by_tag']
588             self.args.mount_home = ['home']
589             self.args.mount_shared = ['shared']
590             mount_readme = True
591
592         if dir_class is not None:
593             if dir_class in [TagsDirectory, CollectionDirectory]:
594                 ent = dir_class(*dir_args)
595             else:
596                 ent = dir_class(*dir_args, storage_classes=storage_classes)
597             self.operations.inodes.add_entry(ent)
598             self.listen_for_events = ent.want_event_subscribe()
599             return
600
601         e = self.operations.inodes.add_entry(Directory(
602             llfuse.ROOT_INODE,
603             self.operations.inodes,
604             self.args.enable_write,
605             self.args.filters,
606         ))
607         dir_args[0] = e.inode
608
609         for name in self.args.mount_by_id:
610             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
611         for name in self.args.mount_by_pdh:
612             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
613         for name in self.args.mount_by_tag:
614             self._add_mount(e, name, TagsDirectory(*dir_args))
615         for name in self.args.mount_home:
616             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
617         for name in self.args.mount_shared:
618             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
619         for name in self.args.mount_tmp:
620             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
621
622         if mount_readme:
623             text = self._readme_text(
624                 arvados.config.get('ARVADOS_API_HOST'),
625                 usr['email'])
626             self._add_mount(e, 'README', StringFile(e.inode, text, now))
627
628     def _add_mount(self, tld, name, ent):
629         if name in ['', '.', '..'] or '/' in name:
630             sys.exit("Mount point '{}' is not supported.".format(name))
631         tld._entries[name] = self.operations.inodes.add_entry(ent)
632         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
633
634     def _readme_text(self, api_host, user_email):
635         return '''
636 Welcome to Arvados!  This directory provides file system access to
637 files and objects available on the Arvados installation located at
638 '{}' using credentials for user '{}'.
639
640 From here, the following directories are available:
641
642   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
643   by_tag/    Access to Keep collections organized by tag.
644   home/      The contents of your home project.
645   shared/    Projects shared with you.
646
647 '''.format(api_host, user_email)
648
649     def _run_exec(self):
650         rc = 255
651         with self:
652             try:
653                 sp = subprocess.Popen(self.args.exec_args, shell=False)
654
655                 # forward signals to the process.
656                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
657                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
658                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
659
660                 # wait for process to complete.
661                 rc = sp.wait()
662
663                 # restore default signal handlers.
664                 signal.signal(signal.SIGINT, signal.SIG_DFL)
665                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
666                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
667             except Exception as e:
668                 self.logger.exception(
669                     'arv-mount: exception during exec %s', self.args.exec_args)
670                 try:
671                     rc = e.errno
672                 except AttributeError:
673                     pass
674         exit(rc)
675
676     def _run_standalone(self):
677         try:
678             self.daemon = not self.args.foreground
679             with self:
680                 self.llfuse_thread.join(timeout=None)
681         except Exception as e:
682             self.logger.exception('arv-mount: exception during mount: %s', e)
683             exit(getattr(e, 'errno', 1))
684         exit(0)
685
686     def _llfuse_main(self):
687         try:
688             llfuse.main(workers=10)
689         except:
690             llfuse.close(unmount=False)
691             raise
692         self.operations.begin_shutdown()
693         llfuse.close()