Merge branch '21121-cluster-activity' refs #21121
[arvados.git] / sdk / python / arvados / _internal / basedirs.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Base directories utility module
5
6 This module provides a set of classes useful to search and manipulate base
7 directory defined by systemd and the XDG specification. Most users will just
8 instantiate and use `BaseDirectories`.
9 """
10
11 import dataclasses
12 import enum
13 import itertools
14 import logging
15 import os
16 import shlex
17 import stat
18
19 from pathlib import Path, PurePath
20 from typing import (
21     Iterator,
22     Mapping,
23     Optional,
24     Union,
25 )
26
27 logger = logging.getLogger('arvados')
28
29 @dataclasses.dataclass
30 class BaseDirectorySpec:
31     """Parse base directories
32
33     A BaseDirectorySpec defines all the environment variable keys and defaults
34     related to a set of base directories (cache, config, state, etc.). It
35     provides pure methods to parse environment settings into valid paths.
36     """
37     systemd_key: str
38     xdg_home_key: str
39     xdg_home_default: PurePath
40     xdg_dirs_key: Optional[str] = None
41     xdg_dirs_default: str = ''
42
43     @staticmethod
44     def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
45         try:
46             path = Path(env[key])
47         except (KeyError, ValueError):
48             ok = False
49         else:
50             ok = path.is_absolute()
51         return path if ok else None
52
53     @staticmethod
54     def _iter_abspaths(value: str) -> Iterator[Path]:
55         for path_s in value.split(':'):
56             path = Path(path_s)
57             if path.is_absolute():
58                 yield path
59
60     def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
61         return self._iter_abspaths(env.get(self.systemd_key, ''))
62
63     def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
64         yield self.xdg_home(env, subdir)
65         if self.xdg_dirs_key is not None:
66             for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
67                 yield path / subdir
68
69     def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
70         return (
71             self._abspath_from_env(env, self.xdg_home_key)
72             or self.xdg_home_default_path(env)
73         ) / subdir
74
75     def xdg_home_default_path(self, env: Mapping[str, str]) -> Path:
76         return (self._abspath_from_env(env, 'HOME') or Path.home()) / self.xdg_home_default
77
78     def xdg_home_is_customized(self, env: Mapping[str, str]) -> bool:
79         xdg_home = self._abspath_from_env(env, self.xdg_home_key)
80         return xdg_home is not None and xdg_home != self.xdg_home_default_path(env)
81
82
83 class BaseDirectorySpecs(enum.Enum):
84     """Base directory specifications
85
86     This enum provides easy access to the standard base directory settings.
87     """
88     CACHE = BaseDirectorySpec(
89         'CACHE_DIRECTORY',
90         'XDG_CACHE_HOME',
91         PurePath('.cache'),
92     )
93     CONFIG = BaseDirectorySpec(
94         'CONFIGURATION_DIRECTORY',
95         'XDG_CONFIG_HOME',
96         PurePath('.config'),
97         'XDG_CONFIG_DIRS',
98         '/etc/xdg',
99     )
100     STATE = BaseDirectorySpec(
101         'STATE_DIRECTORY',
102         'XDG_STATE_HOME',
103         PurePath('.local', 'state'),
104     )
105
106
107 class BaseDirectories:
108     """Resolve paths from a base directory spec
109
110     Given a BaseDirectorySpec, this class provides stateful methods to find
111     existing files and return the most-preferred directory for writing.
112     """
113     _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
114
115     def __init__(
116             self,
117             spec: Union[BaseDirectorySpec, BaseDirectorySpecs, str],
118             env: Mapping[str, str]=os.environ,
119             xdg_subdir: Union[os.PathLike, str]='arvados',
120     ) -> None:
121         if isinstance(spec, str):
122             spec = BaseDirectorySpecs[spec].value
123         elif isinstance(spec, BaseDirectorySpecs):
124             spec = spec.value
125         self._spec = spec
126         self._env = env
127         self._xdg_subdir = PurePath(xdg_subdir)
128
129     def search(self, name: str) -> Iterator[Path]:
130         any_found = False
131         for search_path in itertools.chain(
132                 self._spec.iter_systemd(self._env),
133                 self._spec.iter_xdg(self._env, self._xdg_subdir),
134         ):
135             path = search_path / name
136             if path.exists():
137                 yield path
138                 any_found = True
139         # The rest of this function is dedicated to warning the user if they
140         # have a custom XDG_*_HOME value that prevented the search from
141         # succeeding. This should be rare.
142         if any_found or not self._spec.xdg_home_is_customized(self._env):
143             return
144         default_home = self._spec.xdg_home_default_path(self._env)
145         default_path = Path(self._xdg_subdir / name)
146         if not (default_home / default_path).exists():
147             return
148         if self._spec.xdg_dirs_key is None:
149             suggest_key = self._spec.xdg_home_key
150             suggest_value = default_home
151         else:
152             suggest_key = self._spec.xdg_dirs_key
153             cur_value = self._env.get(suggest_key, '')
154             value_sep = ':' if cur_value else ''
155             suggest_value = f'{cur_value}{value_sep}{default_home}'
156         logger.warning(
157             "\
158 %s was not found under your configured $%s (%s), \
159 but does exist at the default location (%s) - \
160 consider running this program with the environment setting %s=%s\
161 ",
162             default_path,
163             self._spec.xdg_home_key,
164             self._spec.xdg_home(self._env, ''),
165             default_home,
166             suggest_key,
167             shlex.quote(suggest_value),
168         )
169
170     def storage_path(
171             self,
172             subdir: Union[str, os.PathLike]=PurePath(),
173             mode: int=0o700,
174     ) -> Path:
175         for path in self._spec.iter_systemd(self._env):
176             try:
177                 mode = path.stat().st_mode
178             except OSError:
179                 continue
180             if (mode & self._STORE_MODE) == self._STORE_MODE:
181                 break
182         else:
183             path = self._spec.xdg_home(self._env, self._xdg_subdir)
184         path /= subdir
185         path.mkdir(parents=True, exist_ok=True, mode=mode)
186         return path