]> git.arvados.org - arvados.git/blob - services/fuse/arvados_fuse/command.py
22845: counts now update when fetched
[arvados.git] / services / fuse / arvados_fuse / command.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import argparse
6 import arvados
7 import daemon
8 import llfuse
9 import logging
10 import os
11 import resource
12 import signal
13 import subprocess
14 import sys
15 import time
16 import resource
17
18 import arvados.commands._util as arv_cmd
19 from arvados_fuse import crunchstat
20 from arvados_fuse import *
21 from arvados_fuse.unmount import unmount
22 from arvados_fuse._version import __version__
23
24 class ArgumentParser(argparse.ArgumentParser):
25     def __init__(self):
26         super(ArgumentParser, self).__init__(
27             parents=[arv_cmd.retry_opt],
28             description="Interact with Arvados data through a local filesystem",
29         )
30         self.add_argument(
31             '--version',
32             action='version',
33             version=u"%s %s" % (sys.argv[0], __version__),
34             help="Print version and exit",
35         )
36         self.add_argument(
37             'mountpoint',
38             metavar='MOUNT_DIR',
39             help="Directory path to mount data",
40         )
41
42         mode_group = self.add_argument_group("Mount contents")
43         mode = mode_group.add_mutually_exclusive_group()
44         mode.add_argument(
45             '--all',
46             action='store_const',
47             const='all',
48             dest='mode',
49             help="""
50 Mount a subdirectory for each mode: `home`, `shared`, `by_id`, and `by_tag`
51 (default if no `--mount-*` options are given)
52 """,
53         )
54         mode.add_argument(
55             '--custom',
56             action='store_const',
57             const=None,
58             dest='mode',
59             help="""
60 Mount a subdirectory for each mode specified by a `--mount-*` option
61 (default if any `--mount-*` options are given;
62 see "Mount custom layout and filtering" section)
63 """,
64         )
65         mode.add_argument(
66             '--collection',
67             metavar='UUID_OR_PDH',
68             help="Mount the specified collection",
69         )
70         mode.add_argument(
71             '--home',
72             action='store_const',
73             const='home',
74             dest='mode',
75             help="Mount your home project",
76         )
77         mode.add_argument(
78             '--project',
79             metavar='UUID',
80             help="Mount the specified project",
81         )
82         mode.add_argument(
83             '--shared',
84             action='store_const',
85             const='shared',
86             dest='mode',
87             help="Mount a subdirectory for each project shared with you",
88         )
89         mode.add_argument(
90             '--by-id',
91             action='store_const',
92             const='by_id',
93             dest='mode',
94             help="""
95 Mount a magic directory where collections and projects are accessible through
96 subdirectories named after their UUID or portable data hash
97 """,
98         )
99         mode.add_argument(
100             '--by-pdh',
101             action='store_const',
102             const='by_pdh',
103             dest='mode',
104             help="""
105 Mount a magic directory where collections are accessible through
106 subdirectories named after their portable data hash
107 """,
108         )
109         mode.add_argument(
110             '--by-tag',
111             action='store_const',
112             const='by_tag',
113             dest='mode',
114             help="Mount a subdirectory for each tag attached to a collection or project",
115         )
116
117         mounts = self.add_argument_group("Mount custom layout and filtering")
118         mounts.add_argument(
119             '--filters',
120             type=arv_cmd.JSONArgument(arv_cmd.validate_filters),
121             help="""
122 Filters to apply to all project, shared, and tag directory contents.
123 Pass filters as either a JSON string or a path to a JSON file.
124 The JSON object should be a list of filters in Arvados API list filter syntax.
125 """,
126         )
127         mounts.add_argument(
128             '--mount-home',
129             metavar='PATH',
130             action='append',
131             default=[],
132             help="Make your home project available under the mount at `PATH`",
133         )
134         mounts.add_argument(
135             '--mount-shared',
136             metavar='PATH',
137             action='append',
138             default=[],
139             help="Make projects shared with you available under the mount at `PATH`",
140         )
141         mounts.add_argument(
142             '--mount-tmp',
143             metavar='PATH',
144             action='append',
145             default=[],
146             help="""
147 Make a new temporary writable collection available under the mount at `PATH`.
148 This collection is deleted when the mount is unmounted.
149 """,
150         )
151         mounts.add_argument(
152             '--mount-by-id',
153             metavar='PATH',
154             action='append',
155             default=[],
156             help="""
157 Make a magic directory available under the mount at `PATH` where collections and
158 projects are accessible through subdirectories named after their UUID or
159 portable data hash
160 """,
161         )
162         mounts.add_argument(
163             '--mount-by-pdh',
164             metavar='PATH',
165             action='append',
166             default=[],
167             help="""
168 Make a magic directory available under the mount at `PATH` where collections
169 are accessible through subdirectories named after portable data hash
170 """,
171         )
172         mounts.add_argument(
173             '--mount-by-tag',
174             metavar='PATH',
175             action='append',
176             default=[],
177             help="""
178 Make a subdirectory for each tag attached to a collection or project available
179 under the mount at `PATH`
180 """ ,
181         )
182
183         perms = self.add_argument_group("Mount access and permissions")
184         perms.add_argument(
185             '--allow-other',
186             action='store_true',
187             help="Let other users on this system read mounted data (default false)",
188         )
189         perms.add_argument(
190             '--read-only',
191             action='store_false',
192             default=False,
193             dest='enable_write',
194             help="Mounted data cannot be modified from the mount (default)",
195         )
196         perms.add_argument(
197             '--read-write',
198             action='store_true',
199             default=False,
200             dest='enable_write',
201             help="Mounted data can be modified from the mount",
202         )
203
204         lifecycle = self.add_argument_group("Mount lifecycle management")
205         lifecycle.add_argument(
206             '--exec',
207             nargs=argparse.REMAINDER,
208             dest="exec_args",
209             help="""
210 Mount data, run the specified command, then unmount and exit.
211 `--exec` reads all remaining options as the command to run,
212 so it must be the last option you specify.
213 Either end your command arguments (and other options) with a `--` argument,
214 or specify `--exec` after your mount point.
215 """,
216         )
217         lifecycle.add_argument(
218             '--foreground',
219             action='store_true',
220             default=False,
221             help="Run mount process in the foreground instead of daemonizing (default false)",
222         )
223         lifecycle.add_argument(
224             '--subtype',
225             help="Set mounted filesystem type to `fuse.SUBTYPE` (default is just `fuse`)",
226         )
227         unmount = lifecycle.add_mutually_exclusive_group()
228         unmount.add_argument(
229             '--replace',
230             action='store_true',
231             default=False,
232             help="""
233 If a FUSE mount is already mounted at the given directory,
234 unmount it before mounting the requested data.
235 If `--subtype` is specified, unmount only if the mount has that subtype.
236 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
237 """,
238         )
239         unmount.add_argument(
240             '--unmount',
241             action='store_true',
242             default=False,
243             help="""
244 If a FUSE mount is already mounted at the given directory, unmount it and exit.
245 If `--subtype` is specified, unmount only if the mount has that subtype.
246 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
247 """,
248         )
249         unmount.add_argument(
250             '--unmount-all',
251             action='store_true',
252             default=False,
253             help="""
254 Unmount all FUSE mounts at or below the given directory, then exit.
255 If `--subtype` is specified, unmount only if the mount has that subtype.
256 WARNING: This command can affect any kind of FUSE mount, not just arv-mount.
257 """,
258         )
259         lifecycle.add_argument(
260             '--unmount-timeout',
261             type=float,
262             default=2.0,
263             metavar='SECONDS',
264             help="""
265 The number of seconds to wait for a clean unmount after an `--exec` command has
266 exited (default %(default).01f).
267 After this time, the mount will be forcefully unmounted.
268 """,
269         )
270
271         reporting = self.add_argument_group("Mount logging and statistics")
272         reporting.add_argument(
273             '--crunchstat-interval',
274             type=float,
275             default=0.0,
276             metavar='SECONDS',
277             help="Write stats to stderr every N seconds (default disabled)",
278         )
279         reporting.add_argument(
280             '--debug',
281             action='store_true',
282             help="Log debug information",
283         )
284         reporting.add_argument(
285             '--logfile',
286             help="Write debug logs and errors to the specified file (default stderr)",
287         )
288
289         cache = self.add_argument_group("Mount local cache setup")
290         cachetype = cache.add_mutually_exclusive_group()
291         cachetype.add_argument(
292             '--disk-cache',
293             action='store_true',
294             default=True,
295             dest='disk_cache',
296             help="Cache data on the local filesystem (default)",
297         )
298         cachetype.add_argument(
299             '--ram-cache',
300             action='store_false',
301             default=True,
302             dest='disk_cache',
303             help="Cache data in memory",
304         )
305         cache.add_argument(
306             '--disk-cache-dir',
307             metavar="DIRECTORY",
308             help="Set custom filesystem cache location",
309         )
310         cache.add_argument(
311             '--directory-cache',
312             type=int,
313             default=128*1024*1024,
314             metavar='BYTES',
315             help="Size of directory data cache in bytes (default 128 MiB)",
316         )
317         cache.add_argument(
318             '--file-cache',
319             type=int,
320             default=0,
321             metavar='BYTES',
322             help="""
323 Size of file data cache in bytes
324 (default 8 GiB for filesystem cache, 256 MiB for memory cache)
325 """,
326         )
327
328         plumbing = self.add_argument_group("Mount interactions with Arvados and Linux")
329         plumbing.add_argument(
330             '--disable-event-listening',
331             action='store_true',
332             dest='disable_event_listening',
333             default=False,
334             help="Don't subscribe to events on the API server to update mount contents",
335         )
336         plumbing.add_argument(
337             '--encoding',
338             default="utf-8",
339             help="""
340 Filesystem character encoding
341 (default %(default)r; specify a name from the Python codec registry)
342 """,
343         )
344         plumbing.add_argument(
345             '--storage-classes',
346             metavar='CLASSES',
347             help="Comma-separated list of storage classes to request for new collections",
348         )
349         plumbing.add_argument(
350             '--refresh-time',
351             metavar='SECONDS',
352             default=15,
353             type=int,
354             help="Upper limit on how long mount contents may be out of date with upstream Arvados before being refreshed on next access (default 15 seconds)",
355         )
356         # This is a hidden argument used by tests.  Normally this
357         # value will be extracted from the cluster config, but mocking
358         # the cluster config under the presence of multiple threads
359         # and processes turned out to be too complicated and brittle.
360         plumbing.add_argument(
361             '--fsns',
362             type=str,
363             default=None,
364             help=argparse.SUPPRESS)
365
366 class Mount(object):
367     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
368         self.daemon = False
369         self.logger = logger
370         self.args = args
371         self.listen_for_events = False
372
373         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
374         if self.args.logfile:
375             self.args.logfile = os.path.realpath(self.args.logfile)
376
377         try:
378             self._setup_logging()
379         except Exception as e:
380             self.logger.exception("exception during setup: %s", e)
381             exit(1)
382
383         try:
384             nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
385
386             minlimit = 10240
387             if self.args.file_cache:
388                 # Adjust the file handle limit so it can meet
389                 # the desired cache size. Multiply by 8 because the
390                 # number of 64 MiB cache slots that keepclient
391                 # allocates is RLIMIT_NOFILE / 8
392                 minlimit = int((self.args.file_cache/(64*1024*1024)) * 8)
393
394             if nofile_limit[0] < minlimit:
395                 resource.setrlimit(resource.RLIMIT_NOFILE, (min(minlimit, nofile_limit[1]), nofile_limit[1]))
396
397             if minlimit > nofile_limit[1]:
398                 self.logger.warning("file handles required to meet --file-cache (%s) exceeds hard file handle limit (%s), cache size will be smaller than requested", minlimit, nofile_limit[1])
399
400         except Exception as e:
401             self.logger.warning("unable to adjust file handle limit: %s", e)
402
403         nofile_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
404         self.logger.info("file cache capped at %s bytes or less based on available disk (RLIMIT_NOFILE is %s)", ((nofile_limit[0]//8)*64*1024*1024), nofile_limit)
405
406         try:
407             self._setup_api()
408             self._setup_mount()
409         except Exception as e:
410             self.logger.exception("exception during setup: %s", e)
411             exit(1)
412
413     def __enter__(self):
414         if self.args.replace:
415             unmount(path=self.args.mountpoint,
416                     timeout=self.args.unmount_timeout)
417         llfuse.init(self.operations, str(self.args.mountpoint), self._fuse_options())
418         if self.daemon:
419             daemon.DaemonContext(
420                 working_directory=os.path.dirname(self.args.mountpoint),
421                 files_preserve=list(range(
422                     3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
423             ).open()
424         if self.listen_for_events and not self.args.disable_event_listening:
425             self.operations.listen_for_events()
426         self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
427         self.llfuse_thread.daemon = True
428         self.llfuse_thread.start()
429         self.operations.initlock.wait()
430         return self
431
432     def __exit__(self, exc_type, exc_value, traceback):
433         if self.operations.events:
434             self.operations.events.close(timeout=self.args.unmount_timeout)
435         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
436         self.llfuse_thread.join(timeout=self.args.unmount_timeout)
437         self.api.keep.block_cache.clear()
438         if self.llfuse_thread.is_alive():
439             self.logger.warning("Mount.__exit__:"
440                                 " llfuse thread still alive %fs after umount"
441                                 " -- abandoning and exiting anyway",
442                                 self.args.unmount_timeout)
443
444     def run(self):
445         if self.args.unmount or self.args.unmount_all:
446             unmount(path=self.args.mountpoint,
447                     subtype=self.args.subtype,
448                     timeout=self.args.unmount_timeout,
449                     recursive=self.args.unmount_all)
450         elif self.args.exec_args:
451             self._run_exec()
452         else:
453             self._run_standalone()
454
455     def _fuse_options(self):
456         """FUSE mount options; see mount.fuse(8)"""
457         opts = [optname for optname in ['allow_other', 'debug']
458                 if getattr(self.args, optname)]
459         # Increase default read/write size from 4KiB to 128KiB
460         opts += ["big_writes", "max_read=131072"]
461         if self.args.subtype:
462             opts += ["subtype="+self.args.subtype]
463         return opts
464
465     def _setup_logging(self):
466         # Configure a log handler based on command-line switches.
467         if self.args.logfile:
468             log_handler = logging.FileHandler(self.args.logfile)
469             log_handler.setFormatter(logging.Formatter(
470                 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
471                 '%Y-%m-%d %H:%M:%S'))
472         else:
473             log_handler = None
474
475         if log_handler is not None:
476             arvados.logger.removeHandler(arvados.log_handler)
477             arvados.logger.addHandler(log_handler)
478
479         if self.args.debug:
480             arvados.logger.setLevel(logging.DEBUG)
481             logging.getLogger('arvados.keep').setLevel(logging.DEBUG)
482             logging.getLogger('arvados.api').setLevel(logging.DEBUG)
483             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
484             self.logger.debug("arv-mount debugging enabled")
485
486         self.logger.info("%s %s started", sys.argv[0], __version__)
487         self.logger.info("enable write is %s", self.args.enable_write)
488
489     def _setup_api(self):
490         try:
491             # default value of file_cache is 0, this tells KeepBlockCache to
492             # choose a default based on whether disk_cache is enabled or not.
493
494             block_cache = arvados.keep.KeepBlockCache(cache_max=self.args.file_cache,
495                                                       disk_cache=self.args.disk_cache,
496                                                       disk_cache_dir=self.args.disk_cache_dir)
497
498             self.api = arvados.safeapi.ThreadSafeApiCache(
499                 apiconfig=arvados.config.settings(),
500                 api_params={
501                     'num_retries': self.args.retries,
502                 },
503                 keep_params={
504                     'block_cache': block_cache,
505                     'num_retries': self.args.retries,
506                 },
507                 version='v1',
508             )
509         except KeyError as e:
510             self.logger.error("Missing environment: %s", e)
511             exit(1)
512         # Do a sanity check that we have a working arvados host + token.
513         self.api.users().current().execute()
514
515     def _setup_mount(self):
516         self.operations = Operations(
517             os.getuid(),
518             os.getgid(),
519             api_client=self.api,
520             encoding=self.args.encoding,
521             inode_cache=InodeCache(cap=self.args.directory_cache),
522             enable_write=self.args.enable_write,
523             fsns=self.args.fsns)
524
525         if self.args.crunchstat_interval:
526             statsthread = threading.Thread(
527                 target=crunchstat.statlogger,
528                 args=(self.args.crunchstat_interval,
529                       self.api.keep,
530                       self.operations))
531             statsthread.daemon = True
532             statsthread.start()
533
534         usr = self.api.users().current().execute(num_retries=self.args.retries)
535         now = time.time()
536         dir_class = None
537         dir_args = [
538             llfuse.ROOT_INODE,
539             self.operations.inodes,
540             self.api,
541             self.args.retries,
542             self.args.enable_write,
543             self.args.filters,
544         ]
545         mount_readme = False
546
547         storage_classes = None
548         if self.args.storage_classes is not None:
549             storage_classes = self.args.storage_classes.replace(' ', '').split(',')
550             self.logger.info("Storage classes requested for new collections: {}".format(', '.join(storage_classes)))
551
552         if self.args.collection is not None:
553             # Set up the request handler with the collection at the root
554             # First check that the collection is readable
555             self.api.collections().get(uuid=self.args.collection).execute()
556             self.args.mode = 'collection'
557             dir_class = CollectionDirectory
558             dir_args.append(self.args.collection)
559         elif self.args.project is not None:
560             self.args.mode = 'project'
561             dir_class = ProjectDirectory
562             dir_args.append(
563                 self.api.groups().get(uuid=self.args.project).execute(
564                     num_retries=self.args.retries))
565
566         if (self.args.mount_by_id or
567             self.args.mount_by_pdh or
568             self.args.mount_by_tag or
569             self.args.mount_home or
570             self.args.mount_shared or
571             self.args.mount_tmp):
572             if self.args.mode is not None:
573                 sys.exit(
574                     "Cannot combine '{}' mode with custom --mount-* options.".
575                     format(self.args.mode))
576         elif self.args.mode is None:
577             # If no --mount-custom or custom mount args, --all is the default
578             self.args.mode = 'all'
579
580         if self.args.mode in ['by_id', 'by_pdh']:
581             # Set up the request handler with the 'magic directory' at the root
582             dir_class = MagicDirectory
583             dir_args.append(self.args.mode == 'by_pdh')
584         elif self.args.mode == 'by_tag':
585             dir_class = TagsDirectory
586         elif self.args.mode == 'shared':
587             dir_class = SharedDirectory
588             dir_args.append(usr)
589         elif self.args.mode == 'home':
590             dir_class = ProjectDirectory
591             dir_args.append(usr)
592         elif self.args.mode == 'all':
593             self.args.mount_by_id = ['by_id']
594             self.args.mount_by_tag = ['by_tag']
595             self.args.mount_home = ['home']
596             self.args.mount_shared = ['shared']
597             mount_readme = True
598
599         if dir_class is not None:
600             if dir_class in [TagsDirectory, CollectionDirectory]:
601                 ent = dir_class(*dir_args, poll_time=self.args.refresh_time)
602             else:
603                 ent = dir_class(*dir_args, storage_classes=storage_classes, poll_time=self.args.refresh_time)
604             self.operations.inodes.add_entry(ent)
605             self.listen_for_events = ent.want_event_subscribe()
606             return
607
608         e = self.operations.inodes.add_entry(Directory(
609             llfuse.ROOT_INODE,
610             self.operations.inodes,
611             self.args.enable_write,
612             self.args.filters,
613         ))
614         dir_args[0] = e.inode
615
616         for name in self.args.mount_by_id:
617             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=False,
618                                                     storage_classes=storage_classes,
619                                                     poll_time=self.args.refresh_time))
620         for name in self.args.mount_by_pdh:
621             self._add_mount(e, name, MagicDirectory(*dir_args, pdh_only=True,
622                                                     poll_time=self.args.refresh_time))
623         for name in self.args.mount_by_tag:
624             self._add_mount(e, name, TagsDirectory(*dir_args))
625         for name in self.args.mount_home:
626             self._add_mount(e, name, ProjectDirectory(*dir_args, project_object=usr,
627                                                       storage_classes=storage_classes,
628                                                       poll_time=self.args.refresh_time))
629         for name in self.args.mount_shared:
630             self._add_mount(e, name, SharedDirectory(*dir_args, exclude=usr,
631                                                      storage_classes=storage_classes,
632                                                      poll_time=self.args.refresh_time))
633         for name in self.args.mount_tmp:
634             self._add_mount(e, name, TmpCollectionDirectory(*dir_args,
635                                                             storage_classes=storage_classes))
636
637         if mount_readme:
638             text = self._readme_text(
639                 arvados.config.get('ARVADOS_API_HOST'),
640                 usr['email'])
641             self._add_mount(e, 'README', StringFile(e.inode, text, now))
642
643     def _add_mount(self, tld, name, ent):
644         if name in ['', '.', '..'] or '/' in name:
645             sys.exit("Mount point '{}' is not supported.".format(name))
646         tld._entries[name] = self.operations.inodes.add_entry(ent)
647         self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
648
649     def _readme_text(self, api_host, user_email):
650         return '''
651 Welcome to Arvados!  This directory provides file system access to
652 files and objects available on the Arvados installation located at
653 '{}' using credentials for user '{}'.
654
655 From here, the following directories are available:
656
657   by_id/     Access to Keep collections by uuid or portable data hash (see by_id/README for details).
658   by_tag/    Access to Keep collections organized by tag.
659   home/      The contents of your home project.
660   shared/    Projects shared with you.
661
662 '''.format(api_host, user_email)
663
664     def _run_exec(self):
665         rc = 255
666         with self:
667             try:
668                 sp = subprocess.Popen(self.args.exec_args, shell=False)
669
670                 # forward signals to the process.
671                 signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
672                 signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
673                 signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
674
675                 # wait for process to complete.
676                 rc = sp.wait()
677
678                 # restore default signal handlers.
679                 signal.signal(signal.SIGINT, signal.SIG_DFL)
680                 signal.signal(signal.SIGTERM, signal.SIG_DFL)
681                 signal.signal(signal.SIGQUIT, signal.SIG_DFL)
682             except Exception as e:
683                 self.logger.exception(
684                     'arv-mount: exception during exec %s', self.args.exec_args)
685                 try:
686                     rc = e.errno
687                 except AttributeError:
688                     pass
689         exit(rc)
690
691     def _run_standalone(self):
692         try:
693             self.daemon = not self.args.foreground
694             with self:
695                 self.llfuse_thread.join(timeout=None)
696         except Exception as e:
697             self.logger.exception('arv-mount: exception during mount: %s', e)
698             exit(getattr(e, 'errno', 1))
699         exit(0)
700
701     def _llfuse_main(self):
702         try:
703             llfuse.main(workers=10)
704         except:
705             llfuse.close(unmount=False)
706             raise
707         self.operations.begin_shutdown()
708         llfuse.close()