Merge branch '21601-setuptools-git-deps'
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 from future.utils import native_str
6 from builtins import range
7 from builtins import object
8 import argparse
9 import arvados
10 import daemon
11 import llfuse
12 import logging
13 import os
14 import resource
15 import signal
16 import subprocess
17 import sys
18 import time
19 import resource
20
21 import arvados.commands._util as arv_cmd
22 from arvados_fuse import crunchstat
23 from arvados_fuse import *
24 from arvados_fuse.unmount import unmount
25 from arvados_fuse._version import __version__
26
27 class ArgumentParser(argparse.ArgumentParser):
28     def __init__(self):
29         super(ArgumentParser, self).__init__(
30             parents=[arv_cmd.retry_opt],
31             description="Interact with Arvados data through a local filesystem",
32         )
33         self.add_argument(
34             '--version',
35             action='version',
36             version=u"%s %s" % (sys.argv[0], __version__),
37             help="Print version and exit",
38         )
39         self.add_argument(
40             'mountpoint',
41             metavar='MOUNT_DIR',
42             help="Directory path to mount data",
43         )
44
45         mode_group = self.add_argument_group("Mount contents")
46         mode = mode_group.add_mutually_exclusive_group()
47         mode.add_argument(
48             '--all',
49             action='store_const',
50             const='all',
51             dest='mode',
52             help="""
53 Mount a subdirectory for each mode: `home`, `shared`, `by_id`, and `by_tag`
54 (default if no `--mount-*` options are given)
55 """,
56         )
57         mode.add_argument(
58             '--custom',
59             action='store_const',
60             const=None,
61             dest='mode',
62             help="""
63 Mount a subdirectory for each mode specified by a `--mount-*` option
64 (default if any `--mount-*` options are given;
65 see "Mount custom layout and filtering" section)
66 """,
67         )
68         mode.add_argument(
69             '--collection',
70             metavar='UUID_OR_PDH',
71             help="Mount the specified collection",
72         )
73         mode.add_argument(
74             '--home',
75             action='store_const',
76             const='home',
77             dest='mode',
78             help="Mount your home project",
79         )
80         mode.add_argument(
81             '--project',
82             metavar='UUID',
83             help="Mount the specified project",
84         )
85         mode.add_argument(
86             '--shared',
87             action='store_const',
88             const='shared',
89             dest='mode',
90             help="Mount a subdirectory for each project shared with you",
91         )
92         mode.add_argument(
93             '--by-id',
94             action='store_const',
95             const='by_id',
96             dest='mode',
97             help="""
98 Mount a magic directory where collections and projects are accessible through
99 subdirectories named after their UUID or portable data hash
100 """,
101         )
102         mode.add_argument(
103             '--by-pdh',
104             action='store_const',
105             const='by_pdh',
106             dest='mode',
107             help="""
108 Mount a magic directory where collections are accessible through
109 subdirectories named after their portable data hash
110 """,
111         )
112         mode.add_argument(
113             '--by-tag',
114             action='store_const',
115             const='by_tag',
116             dest='mode',
117             help="Mount a subdirectory for each tag attached to a collection or project",
118         )
119
120         mounts = self.add_argument_group("Mount custom layout and filtering")
121         mounts.add_argument(
122             '--filters',
123             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
124             help="""
125 Filters to apply to all project, shared, and tag directory contents.
126 Pass filters as either a JSON string or a path to a JSON file.
127 The JSON object should be a list of filters in Arvados API list filter syntax.
128 """,
129         )
130         mounts.add_argument(
131             '--mount-home',
132             metavar='PATH',
133             action='append',
134             default=[],
135             help="Make your home project available under the mount at `PATH`",
136         )
137         mounts.add_argument(
138             '--mount-shared',
139             metavar='PATH',
140             action='append',
141             default=[],
142             help="Make projects shared with you available under the mount at `PATH`",
143         )
144         mounts.add_argument(
145             '--mount-tmp',
146             metavar='PATH',
147             action='append',
148             default=[],
149             help="""
150 Make a new temporary writable collection available under the mount at `PATH`.
151 This collection is deleted when the mount is unmounted.
152 """,
153         )
154         mounts.add_argument(
155             '--mount-by-id',
156             metavar='PATH',
157             action='append',
158             default=[],
159             help="""
160 Make a magic directory available under the mount at `PATH` where collections and
161 projects are accessible through subdirectories named after their UUID or
162 portable data hash
163 """,
164         )
165         mounts.add_argument(
166             '--mount-by-pdh',
167             metavar='PATH',
168             action='append',
169             default=[],
170             help="""
171 Make a magic directory available under the mount at `PATH` where collections
172 are accessible through subdirectories named after portable data hash
173 """,
174         )
175         mounts.add_argument(
176             '--mount-by-tag',
177             metavar='PATH',
178             action='append',
179             default=[],
180             help="""
181 Make a subdirectory for each tag attached to a collection or project available
182 under the mount at `PATH`
183 """ ,
184         )
185
186         perms = self.add_argument_group("Mount access and permissions")
187         perms.add_argument(
188             '--allow-other',
189             action='store_true',
190             help="Let other users on this system read mounted data (default false)",
191         )
192         perms.add_argument(
193             '--read-only',
194             action='store_false',
195             default=False,
196             dest='enable_write',
197             help="Mounted data cannot be modified from the mount (default)",
198         )
199         perms.add_argument(
200             '--read-write',
201             action='store_true',
202             default=False,
203             dest='enable_write',
204             help="Mounted data can be modified from the mount",
205         )
206
207         lifecycle = self.add_argument_group("Mount lifecycle management")
208         lifecycle.add_argument(
209             '--exec',
210             nargs=argparse.REMAINDER,
211             dest="exec_args",
212             help="""
213 Mount data, run the specified command, then unmount and exit.
214 `--exec` reads all remaining options as the command to run,
215 so it must be the last option you specify.
216 Either end your command arguments (and other options) with a `--` argument,
217 or specify `--exec` after your mount point.
218 """,
219         )
220         lifecycle.add_argument(
221             '--foreground',
222             action='store_true',
223             default=False,
224             help="Run mount process in the foreground instead of daemonizing (default false)",
225         )
226         lifecycle.add_argument(
227             '--subtype',
228             help="Set mounted filesystem type to `fuse.SUBTYPE` (default is just `fuse`)",
229         )
230         unmount = lifecycle.add_mutually_exclusive_group()
231         unmount.add_argument(
232             '--replace',
233             action='store_true',
234             default=False,
235             help="""
236 If a FUSE mount is already mounted at the given directory,
237 unmount it before mounting the requested data.
238 If `--subtype` is specified, unmount only if the mount has that subtype.
239 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
240 """,
241         )
242         unmount.add_argument(
243             '--unmount',
244             action='store_true',
245             default=False,
246             help="""
247 If a FUSE mount is already mounted at the given directory, unmount it and exit.
248 If `--subtype` is specified, unmount only if the mount has that subtype.
249 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
250 """,
251         )
252         unmount.add_argument(
253             '--unmount-all',
254             action='store_true',
255             default=False,
256             help="""
257 Unmount all FUSE mounts at or below the given directory, then exit.
258 If `--subtype` is specified, unmount only if the mount has that subtype.
259 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
260 """,
261         )
262         lifecycle.add_argument(
263             '--unmount-timeout',
264             type=float,
265             default=2.0,
266             metavar='SECONDS',
267             help="""
268 The number of seconds to wait for a clean unmount after an `--exec` command has
269 exited (default %(default).01f).
270 After this time, the mount will be forcefully unmounted.
271 """,
272         )
273
274         reporting = self.add_argument_group("Mount logging and statistics")
275         reporting.add_argument(
276             '--crunchstat-interval',
277             type=float,
278             default=0.0,
279             metavar='SECONDS',
280             help="Write stats to stderr every N seconds (default disabled)",
281         )
282         reporting.add_argument(
283             '--debug',
284             action='store_true',
285             help="Log debug information",
286         )
287         reporting.add_argument(
288             '--logfile',
289             help="Write debug logs and errors to the specified file (default stderr)",
290         )
291
292         cache = self.add_argument_group("Mount local cache setup")
293         cachetype = cache.add_mutually_exclusive_group()
294         cachetype.add_argument(
295             '--disk-cache',
296             action='store_true',
297             default=True,
298             dest='disk_cache',
299             help="Cache data on the local filesystem (default)",
300         )
301         cachetype.add_argument(
302             '--ram-cache',
303             action='store_false',
304             default=True,
305             dest='disk_cache',
306             help="Cache data in memory",
307         )
308         cache.add_argument(
309             '--disk-cache-dir',
310             metavar="DIRECTORY",
311             help="Filesystem cache location (default `~/.cache/arvados/keep`)",
312         )
313         cache.add_argument(
314             '--directory-cache',
315             type=int,
316             default=128*1024*1024,
317             metavar='BYTES',
318             help="Size of directory data cache in bytes (default 128 MiB)",
319         )
320         cache.add_argument(
321             '--file-cache',
322             type=int,
323             default=0,
324             metavar='BYTES',
325             help="""
326 Size of file data cache in bytes
327 (default 8 GiB for filesystem cache, 256 MiB for memory cache)
328 """,
329         )
330
331         plumbing = self.add_argument_group("Mount interactions with Arvados and Linux")
332         plumbing.add_argument(
333             '--disable-event-listening',
334             action='store_true',
335             dest='disable_event_listening',
336             default=False,
337             help="Don't subscribe to events on the API server to update mount contents",
338         )
339         plumbing.add_argument(
340             '--encoding',
341             default="utf-8",
342             help="""
343 Filesystem character encoding
344 (default %(default)r; specify a name from the Python codec registry)
345 """,
346         )
347         plumbing.add_argument(
348             '--storage-classes',
349             metavar='CLASSES',
350             help="Comma-separated list of storage classes to request for new collections",
351         )
352         # This is a hidden argument used by tests.  Normally this
353         # value will be extracted from the cluster config, but mocking
354         # the cluster config under the presence of multiple threads
355         # and processes turned out to be too complicated and brittle.
356         plumbing.add_argument(
357             '--fsns',
358             type=str,
359             default=None,
360             help=argparse.SUPPRESS)
361
362 class Mount(object):
363     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
364         self.daemon = False
365         self.logger = logger
366         self.args = args
367         self.listen_for_events = False
368
369         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
370         if self.args.logfile:
371             self.args.logfile = os.path.realpath(self.args.logfile)
372
373         try:
374             self._setup_logging()
375         except Exception as e:
376             self.logger.exception("exception during setup: %s", e)
377             exit(1)
378
379         try:
380             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
381
382             minlimit = 10240
383             if self.args.file_cache:
384                 # Adjust the file handle limit so it can meet
385                 # the desired cache size. Multiply by 8 because the
386                 # number of 64 MiB cache slots that keepclient
387                 # allocates is RLIMIT_NOFILE / 8
388                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
389
390             if nofile_limit[0] < minlimit:
391                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
392
393             if minlimit > nofile_limit[1]:
394                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
395
396         except Exception as e:
397             self.logger.warning("unable to adjust file handle limit: %s", e)
398
399         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
400         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
401
402         try:
403             self._setup_api()
404             self._setup_mount()
405         except Exception as e:
406             self.logger.exception("exception during setup: %s", e)
407             exit(1)
408
409     def __enter__(self):
410         if self.args.replace:
411             unmount(path=self.args.mountpoint,
412                     timeout=self.args.unmount_timeout)
413         llfuse.init(self.operations, native_str(self.args.mountpoint), self._fuse_options())
414         if self.daemon:
415             daemon.DaemonContext(
416                 working_directory=os.path.dirname(self.args.mountpoint),
417                 files_preserve=list(range(
418                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
419             ).open()
420         if self.listen_for_events and not self.args.disable_event_listening:
421             self.operations.listen_for_events()
422         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
423         self.llfuse_thread.daemon = True
424         self.llfuse_thread.start()
425         self.operations.initlock.wait()
426         return self
427
428     def __exit__(self, exc_type, exc_value, traceback):
429         if self.operations.events:
430             self.operations.events.close(timeout=self.args.unmount_timeout)
431         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
432         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
433         if self.llfuse_thread.is_alive():
434             self.logger.warning("Mount.__exit__:"
435                                 " llfuse thread still alive %fs after umount"
436                                 " -- abandoning and exiting anyway",
437                                 self.args.unmount_timeout)
438
439     def run(self):
440         if self.args.unmount or self.args.unmount_all:
441             unmount(path=self.args.mountpoint,
442                     subtype=self.args.subtype,
443                     timeout=self.args.unmount_timeout,
444                     recursive=self.args.unmount_all)
445         elif self.args.exec_args:
446             self._run_exec()
447         else:
448             self._run_standalone()
449
450     def _fuse_options(self):
451         """FUSE mount options; see mount.fuse(8)"""
452         opts = [optname for optname in ['allow_other', 'debug']
453                 if getattr(self.args, optname)]
454         # Increase default read/write size from 4KiB to 128KiB
455         opts += ["big_writes", "max_read=131072"]
456         if self.args.subtype:
457             opts += ["subtype="+self.args.subtype]
458         return opts
459
460     def _setup_logging(self):
461         # Configure a log handler based on command-line switches.
462         if self.args.logfile:
463             log_handler = logging.FileHandler(self.args.logfile)
464             log_handler.setFormatter(logging.Formatter(
465                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
466                 '%Y-%m-%d %H:%M:%S'))
467         else:
468             log_handler = None
469
470         if log_handler is not None:
471             arvados.logger.removeHandler(arvados.log_handler)
472             arvados.logger.addHandler(log_handler)
473
474         if self.args.debug:
475             arvados.logger.setLevel(logging.DEBUG)
476             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
477             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
478             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
479             self.logger.debug("arv-mount debugging enabled")
480
481         self.logger.info("%s %s started", sys.argv[0], __version__)
482         self.logger.info("enable write is %s", self.args.enable_write)
483
484     def _setup_api(self):
485         try:
486             # default value of file_cache is 0, this tells KeepBlockCache to
487             # choose a default based on whether disk_cache is enabled or not.
488
489             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
490                                                       disk_cache=self.args.disk_cache,
491                                                       disk_cache_dir=self.args.disk_cache_dir)
492
493             # If there's too many prefetch threads and you
494             # max out the CPU, delivering data to the FUSE
495             # layer actually ends up being slower.
496             # Experimentally, capping 7 threads seems to
497             # be a sweet spot.
498             prefetch_threads = min(max((block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
499
500             self.api = arvados.safeapi.ThreadSafeApiCache(
501                 apiconfig=arvados.config.settings(),
502                 api_params={
503                     'num_retries': self.args.retries,
504                 },
505                 keep_params={
506                     'block_cache': block_cache,
507                     'num_prefetch_threads': prefetch_threads,
508                     'num_retries': self.args.retries,
509                 },
510                 version='v1',
511             )
512         except KeyError as e:
513             self.logger.error("Missing environment: %s", e)
514             exit(1)
515         # Do a sanity check that we have a working arvados host + token.
516         self.api.users().current().execute()
517
518     def _setup_mount(self):
519         self.operations = Operations(
520             os.getuid(),
521             os.getgid(),
522             api_client=self.api,
523             encoding=self.args.encoding,
524             inode_cache=InodeCache(cap=self.args.directory_cache),
525             enable_write=self.args.enable_write,
526             fsns=self.args.fsns)
527
528         if self.args.crunchstat_interval:
529             statsthread = threading.Thread(
530                 target=crunchstat.statlogger,
531                 args=(self.args.crunchstat_interval,
532                       self.api.keep,
533                       self.operations))
534             statsthread.daemon = True
535             statsthread.start()
536
537         usr = self.api.users().current().execute(num_retries=self.args.retries)
538         now = time.time()
539         dir_class = None
540         dir_args = [
541             llfuse.ROOT_INODE,
542             self.operations.inodes,
543             self.api,
544             self.args.retries,
545             self.args.enable_write,
546             self.args.filters,
547         ]
548         mount_readme = False
549
550         storage_classes = None
551         if self.args.storage_classes is not None:
552             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
553             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
554
555         if self.args.collection is not None:
556             # Set up the request handler with the collection at the root
557             # First check that the collection is readable
558             self.api.collections().get(uuid=self.args.collection).execute()
559             self.args.mode = 'collection'
560             dir_class = CollectionDirectory
561             dir_args.append(self.args.collection)
562         elif self.args.project is not None:
563             self.args.mode = 'project'
564             dir_class = ProjectDirectory
565             dir_args.append(
566                 self.api.groups().get(uuid=self.args.project).execute(
567                     num_retries=self.args.retries))
568
569         if (self.args.mount_by_id or
570             self.args.mount_by_pdh or
571             self.args.mount_by_tag or
572             self.args.mount_home or
573             self.args.mount_shared or
574             self.args.mount_tmp):
575             if self.args.mode is not None:
576                 sys.exit(
577                     "Cannot combine '{}' mode with custom --mount-* options.".
578                     format(self.args.mode))
579         elif self.args.mode is None:
580             # If no --mount-custom or custom mount args, --all is the default
581             self.args.mode = 'all'
582
583         if self.args.mode in ['by_id', 'by_pdh']:
584             # Set up the request handler with the 'magic directory' at the root
585             dir_class = MagicDirectory
586             dir_args.append(self.args.mode == 'by_pdh')
587         elif self.args.mode == 'by_tag':
588             dir_class = TagsDirectory
589         elif self.args.mode == 'shared':
590             dir_class = SharedDirectory
591             dir_args.append(usr)
592         elif self.args.mode == 'home':
593             dir_class = ProjectDirectory
594             dir_args.append(usr)
595             dir_args.append(True)
596         elif self.args.mode == 'all':
597             self.args.mount_by_id = ['by_id']
598             self.args.mount_by_tag = ['by_tag']
599             self.args.mount_home = ['home']
600             self.args.mount_shared = ['shared']
601             mount_readme = True
602
603         if dir_class is not None:
604             if dir_class in [TagsDirectory, CollectionDirectory]:
605                 ent = dir_class(*dir_args)
606             else:
607                 ent = dir_class(*dir_args, storage_classes=storage_classes)
608             self.operations.inodes.add_entry(ent)
609             self.listen_for_events = ent.want_event_subscribe()
610             return
611
612         e = self.operations.inodes.add_entry(Directory(
613             llfuse.ROOT_INODE,
614             self.operations.inodes,
615             self.args.enable_write,
616             self.args.filters,
617         ))
618         dir_args[0] = e.inode
619
620         for name in self.args.mount_by_id:
621             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False, storage_classes=storage_classes))
622         for name in self.args.mount_by_pdh:
623             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True))
624         for name in self.args.mount_by_tag:
625             self._add_mount(e, name, TagsDirectory(*dir_args))
626         for name in self.args.mount_home:
627             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr, poll=True, storage_classes=storage_classes))
628         for name in self.args.mount_shared:
629             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr, poll=True, storage_classes=storage_classes))
630         for name in self.args.mount_tmp:
631             self._add_mount(e, name, TmpCollectionDirectory(*dir_args, storage_classes=storage_classes))
632
633         if mount_readme:
634             text = self._readme_text(
635                 arvados.config.get('ARVADOS_API_HOST'),
636                 usr['email'])
637             self._add_mount(e, 'README', StringFile(e.inode, text, now))
638
639     def _add_mount(self, tld, name, ent):
640         if name in ['', '.', '..'] or '/' in name:
641             sys.exit("Mount point '{}' is not supported.".format(name))
642         tld._entries[name] = self.operations.inodes.add_entry(ent)
643         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
644
645     def _readme_text(self, api_host, user_email):
646         return '''
647 Welcome to Arvados!  This directory provides file system access to
648 files and objects available on the Arvados installation located at
649 '{}' using credentials for user '{}'.
650
651 From here, the following directories are available:
652
653   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
654   by_tag/    Access to Keep collections organized by tag.
655   home/      The contents of your home project.
656   shared/    Projects shared with you.
657
658 '''.format(api_host, user_email)
659
660     def _run_exec(self):
661         rc = 255
662         with self:
663             try:
664                 sp = subprocess.Popen(self.args.exec_args, shell=False)
665
666                 # forward signals to the process.
667                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
668                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
669                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
670
671                 # wait for process to complete.
672                 rc = sp.wait()
673
674                 # restore default signal handlers.
675                 signal.signal(signal.SIGINT, signal.SIG_DFL)
676                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
677                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
678             except Exception as e:
679                 self.logger.exception(
680                     'arv-mount: exception during exec %s', self.args.exec_args)
681                 try:
682                     rc = e.errno
683                 except AttributeError:
684                     pass
685         exit(rc)
686
687     def _run_standalone(self):
688         try:
689             self.daemon = not self.args.foreground
690             with self:
691                 self.llfuse_thread.join(timeout=None)
692         except Exception as e:
693             self.logger.exception('arv-mount: exception during mount: %s', e)
694             exit(getattr(e, 'errno', 1))
695         exit(0)
696
697     def _llfuse_main(self):
698         try:
699             llfuse.main(workers=10)
700         except:
701             llfuse.close(unmount=False)
702             raise
703         self.operations.begin_shutdown()
704         llfuse.close()