1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
30 from pathlib import Path, PurePath
44 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
45 """Regular expression to match a hexadecimal string (case-insensitive)"""
46 CR_UNCOMMITTED = 'Uncommitted'
47 """Constant `state` value for uncommited container requests"""
48 CR_COMMITTED = 'Committed'
49 """Constant `state` value for committed container requests"""
51 """Constant `state` value for finalized container requests"""
53 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
54 """Regular expression to match any Keep block locator"""
55 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
56 """Regular expression to match any Keep block locator with an access token hint"""
57 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
58 """Regular expression to match any collection portable data hash"""
59 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
60 """Regular expression to match an Arvados collection manifest text"""
61 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a file path from a collection identified by portable data hash"""
63 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
64 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
66 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
67 """Regular expression to match any Arvados object UUID"""
68 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
69 """Regular expression to match any Arvados collection UUID"""
70 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
71 """Regular expression to match any Arvados container UUID"""
72 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
73 """Regular expression to match any Arvados group UUID"""
74 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
75 """Regular expression to match any Arvados link UUID"""
76 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
77 """Regular expression to match any Arvados user UUID"""
79 logger = logging.getLogger('arvados')
81 def _deprecated(version=None, preferred=None):
82 """Mark a callable as deprecated in the SDK
84 This will wrap the callable to emit as a DeprecationWarning
85 and add a deprecation notice to its docstring.
87 If the following arguments are given, they'll be included in the
90 * preferred: str | None --- The name of an alternative that users should
93 * version: str | None --- The version of Arvados when the callable is
94 scheduled to be removed.
99 version = f' and scheduled to be removed in Arvados {version}'
100 if preferred is None:
103 preferred = f' Prefer {preferred} instead.'
104 def deprecated_decorator(func):
105 fullname = f'{func.__module__}.{func.__qualname__}'
106 parent, _, name = fullname.rpartition('.')
107 if name == '__init__':
109 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
110 @functools.wraps(func)
111 def deprecated_wrapper(*args, **kwargs):
112 warnings.warn(warning_msg, DeprecationWarning, 2)
113 return func(*args, **kwargs)
114 # Get func's docstring without any trailing newline or empty lines.
115 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
116 match = re.search(r'\n([ \t]+)\S', func_doc)
117 indent = '' if match is None else match.group(1)
118 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
119 # Make the deprecation notice the second "paragraph" of the
120 # docstring if possible. Otherwise append it.
121 docstring, count = re.subn(
122 rf'\n[ \t]*\n{indent}',
123 f'{warning_doc}\n\n{indent}',
128 docstring = f'{func_doc.lstrip()}{warning_doc}'
129 deprecated_wrapper.__doc__ = docstring
130 return deprecated_wrapper
131 return deprecated_decorator
133 @dataclasses.dataclass
134 class _BaseDirectorySpec:
135 """Parse base directories
137 A _BaseDirectorySpec defines all the environment variable keys and defaults
138 related to a set of base directories (cache, config, state, etc.). It
139 provides pure methods to parse environment settings into valid paths.
143 xdg_home_default: PurePath
144 xdg_dirs_key: Optional[str] = None
145 xdg_dirs_default: str = ''
148 def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
150 path = Path(env[key])
151 except (KeyError, ValueError):
154 ok = path.is_absolute()
155 return path if ok else None
158 def _iter_abspaths(value: str) -> Iterator[Path]:
159 for path_s in value.split(':'):
161 if path.is_absolute():
164 def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
165 return self._iter_abspaths(env.get(self.systemd_key, ''))
167 def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
168 yield self.xdg_home(env, subdir)
169 if self.xdg_dirs_key is not None:
170 for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
173 def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
175 self._abspath_from_env(env, self.xdg_home_key)
176 or self.xdg_home_default_path(env)
179 def xdg_home_default_path(self, env: Mapping[str, str]) -> Path:
180 return (self._abspath_from_env(env, 'HOME') or Path.home()) / self.xdg_home_default
182 def xdg_home_is_customized(self, env: Mapping[str, str]) -> bool:
183 xdg_home = self._abspath_from_env(env, self.xdg_home_key)
184 return xdg_home is not None and xdg_home != self.xdg_home_default_path(env)
187 class _BaseDirectorySpecs(enum.Enum):
188 """Base directory specifications
190 This enum provides easy access to the standard base directory settings.
192 CACHE = _BaseDirectorySpec(
197 CONFIG = _BaseDirectorySpec(
198 'CONFIGURATION_DIRECTORY',
204 STATE = _BaseDirectorySpec(
207 PurePath('.local', 'state'),
211 class _BaseDirectories:
212 """Resolve paths from a base directory spec
214 Given a _BaseDirectorySpec, this class provides stateful methods to find
215 existing files and return the most-preferred directory for writing.
217 _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
221 spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
222 env: Mapping[str, str]=os.environ,
223 xdg_subdir: Union[os.PathLike, str]='arvados',
225 if isinstance(spec, str):
226 spec = _BaseDirectorySpecs[spec].value
227 elif isinstance(spec, _BaseDirectorySpecs):
231 self._xdg_subdir = PurePath(xdg_subdir)
233 def search(self, name: str) -> Iterator[Path]:
235 for search_path in itertools.chain(
236 self._spec.iter_systemd(self._env),
237 self._spec.iter_xdg(self._env, self._xdg_subdir),
239 path = search_path / name
243 # The rest of this function is dedicated to warning the user if they
244 # have a custom XDG_*_HOME value that prevented the search from
245 # succeeding. This should be rare.
246 if any_found or not self._spec.xdg_home_is_customized(self._env):
248 default_home = self._spec.xdg_home_default_path(self._env)
249 default_path = Path(self._xdg_subdir / name)
250 if not (default_home / default_path).exists():
252 if self._spec.xdg_dirs_key is None:
253 suggest_key = self._spec.xdg_home_key
254 suggest_value = default_home
256 suggest_key = self._spec.xdg_dirs_key
257 cur_value = self._env.get(suggest_key, '')
258 value_sep = ':' if cur_value else ''
259 suggest_value = f'{cur_value}{value_sep}{default_home}'
262 %s was not found under your configured $%s (%s), \
263 but does exist at the default location (%s) - \
264 consider running this program with the environment setting %s=%s\
267 self._spec.xdg_home_key,
268 self._spec.xdg_home(self._env, ''),
271 shlex.quote(suggest_value),
276 subdir: Union[str, os.PathLike]=PurePath(),
279 for path in self._spec.iter_systemd(self._env):
281 mode = path.stat().st_mode
284 if (mode & self._STORE_MODE) == self._STORE_MODE:
287 path = self._spec.xdg_home(self._env, self._xdg_subdir)
289 path.mkdir(parents=True, exist_ok=True, mode=mode)
293 def is_hex(s: str, *length_args: int) -> bool:
294 """Indicate whether a string is a hexadecimal number
296 This method returns true if all characters in the string are hexadecimal
297 digits. It is case-insensitive.
299 You can also pass optional length arguments to check that the string has
300 the expected number of digits. If you pass one integer, the string must
301 have that length exactly, otherwise the method returns False. If you
302 pass two integers, the string's length must fall within that minimum and
303 maximum (inclusive), otherwise the method returns False.
307 * s: str --- The string to check
309 * length_args: int --- Optional length limit(s) for the string to check
311 num_length_args = len(length_args)
312 if num_length_args > 2:
313 raise arvados.errors.ArgumentError(
314 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
315 elif num_length_args == 2:
316 good_len = (length_args[0] <= len(s) <= length_args[1])
317 elif num_length_args == 1:
318 good_len = (len(s) == length_args[0])
321 return bool(good_len and HEX_RE.match(s))
324 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
325 order_key: str="created_at",
327 ascending: bool=True,
329 ) -> Iterator[Dict[str, Any]]:
330 """Iterate all Arvados resources from an API list call
332 This method takes a method that represents an Arvados API list call, and
333 iterates the objects returned by the API server. It can make multiple API
334 calls to retrieve and iterate all objects available from the API server.
338 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
339 function that wraps an Arvados API method that returns a list of
340 objects. If you have an Arvados API client named `arv`, examples
341 include `arv.collections().list` and `arv.groups().contents`. Note
342 that you should pass the function *without* calling it.
344 * order_key: str --- The name of the primary object field that objects
345 should be sorted by. This name is used to build an `order` argument
346 for `fn`. Default `'created_at'`.
348 * num_retries: int --- This argument is passed through to
349 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
350 that method's docstring for details. Default 0 (meaning API calls will
351 use the `num_retries` value set when the Arvados API client was
354 * ascending: bool --- Used to build an `order` argument for `fn`. If True,
355 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
356 fields will be sorted in `'desc'` (descending) order.
358 Additional keyword arguments will be passed directly to `fn` for each API
359 call. Note that this function sets `count`, `limit`, and `order` as part of
363 kwargs["limit"] = pagesize
364 kwargs["count"] = 'none'
365 asc = "asc" if ascending else "desc"
366 kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
367 other_filters = kwargs.get("filters", [])
370 select = set(kwargs['select'])
374 select.add(order_key)
376 kwargs['select'] = list(select)
380 expect_full_page = True
381 seen_prevpage = set()
382 seen_thispage = set()
384 prev_page_all_same_order_key = False
387 kwargs["filters"] = nextpage+other_filters
388 items = fn(**kwargs).execute(num_retries=num_retries)
390 if len(items["items"]) == 0:
391 if prev_page_all_same_order_key:
392 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
393 prev_page_all_same_order_key = False
398 seen_prevpage = seen_thispage
399 seen_thispage = set()
401 for i in items["items"]:
402 # In cases where there's more than one record with the
403 # same order key, the result could include records we
404 # already saw in the last page. Skip them.
405 if i["uuid"] in seen_prevpage:
407 seen_thispage.add(i["uuid"])
410 firstitem = items["items"][0]
411 lastitem = items["items"][-1]
413 if firstitem[order_key] == lastitem[order_key]:
414 # Got a page where every item has the same order key.
415 # Switch to using uuid for paging.
416 nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
417 prev_page_all_same_order_key = True
419 # Start from the last order key seen, but skip the last
420 # known uuid to avoid retrieving the same row twice. If
421 # there are multiple rows with the same order key it is
422 # still likely we'll end up retrieving duplicate rows.
423 # That's handled by tracking the "seen" rows for each page
424 # so they can be skipped if they show up on the next page.
425 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
426 prev_page_all_same_order_key = False
428 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
429 """Return the path of the best available source of CA certificates
431 This function checks various known paths that provide trusted CA
432 certificates, and returns the first one that exists. It checks:
434 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
435 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
436 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
438 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
441 If none of these paths exist, this function returns the value of `fallback`.
445 * fallback: T --- The value to return if none of the known paths exist.
446 The default value is the certificate store of Mozilla's trusted CAs
447 included with the Python [certifi][] package.
449 [certifi]: https://pypi.org/project/certifi/
451 for ca_certs_path in [
452 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
453 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
454 os.environ.get('SSL_CERT_FILE'),
456 '/etc/arvados/ca-certificates.crt',
458 '/etc/ssl/certs/ca-certificates.crt',
460 '/etc/pki/tls/certs/ca-bundle.crt',
462 if ca_certs_path and os.path.exists(ca_certs_path):
466 def new_request_id() -> str:
467 """Return a random request ID
469 This function generates and returns a random string suitable for use as a
470 `X-Request-Id` header value in the Arvados API.
473 # 2**104 > 36**20 > 2**103
474 n = random.getrandbits(104)
478 rid += chr(c+ord('0'))
480 rid += chr(c+ord('a')-10)
484 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
485 """Return an Arvados cluster's configuration, with caching
487 This function gets and returns the Arvados configuration from the API
488 server. It caches the result on the client object and reuses it on any
493 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
494 object to use to retrieve and cache the Arvados cluster configuration.
496 if not svc._rootDesc.get('resources').get('configs', False):
497 # Old API server version, no config export endpoint
499 if not hasattr(svc, '_cached_config'):
500 svc._cached_config = svc.configs().get().execute()
501 return svc._cached_config
503 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
504 """Return an Arvados cluster's vocabulary, with caching
506 This function gets and returns the Arvados vocabulary from the API
507 server. It caches the result on the client object and reuses it on any
510 .. HINT:: Low-level method
511 This is a relatively low-level wrapper around the Arvados API. Most
512 users will prefer to use `arvados.vocabulary.load_vocabulary`.
516 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
517 object to use to retrieve and cache the Arvados cluster vocabulary.
519 if not svc._rootDesc.get('resources').get('vocabularies', False):
520 # Old API server version, no vocabulary export endpoint
522 if not hasattr(svc, '_cached_vocabulary'):
523 svc._cached_vocabulary = svc.vocabularies().get().execute()
524 return svc._cached_vocabulary
526 def trim_name(collectionname: str) -> str:
527 """Limit the length of a name to fit within Arvados API limits
529 This function ensures that a string is short enough to use as an object
530 name in the Arvados API, leaving room for text that may be added by the
531 `ensure_unique_name` argument. If the source name is short enough, it is
532 returned unchanged. Otherwise, this function returns a string with excess
533 characters removed from the middle of the source string and replaced with
538 * collectionname: str --- The desired source name
540 max_name_len = 254 - 28
542 if len(collectionname) > max_name_len:
543 over = len(collectionname) - max_name_len
544 split = int(max_name_len/2)
545 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
547 return collectionname