1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
31 from pathlib import Path, PurePath
46 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
47 """Regular expression to match a hexadecimal string (case-insensitive)"""
48 CR_UNCOMMITTED = 'Uncommitted'
49 """Constant `state` value for uncommited container requests"""
50 CR_COMMITTED = 'Committed'
51 """Constant `state` value for committed container requests"""
53 """Constant `state` value for finalized container requests"""
55 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
56 """Regular expression to match any Keep block locator"""
57 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
58 """Regular expression to match any Keep block locator with an access token hint"""
59 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
60 """Regular expression to match any collection portable data hash"""
61 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
62 """Regular expression to match an Arvados collection manifest text"""
63 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
64 """Regular expression to match a file path from a collection identified by portable data hash"""
65 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
66 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
68 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
69 """Regular expression to match any Arvados object UUID"""
70 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
71 """Regular expression to match any Arvados collection UUID"""
72 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
73 """Regular expression to match any Arvados container UUID"""
74 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
75 """Regular expression to match any Arvados group UUID"""
76 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
77 """Regular expression to match any Arvados link UUID"""
78 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
79 """Regular expression to match any Arvados user UUID"""
81 logger = logging.getLogger('arvados')
83 def _deprecated(version=None, preferred=None):
84 """Mark a callable as deprecated in the SDK
86 This will wrap the callable to emit as a DeprecationWarning
87 and add a deprecation notice to its docstring.
89 If the following arguments are given, they'll be included in the
92 * preferred: str | None --- The name of an alternative that users should
95 * version: str | None --- The version of Arvados when the callable is
96 scheduled to be removed.
101 version = f' and scheduled to be removed in Arvados {version}'
102 if preferred is None:
105 preferred = f' Prefer {preferred} instead.'
106 def deprecated_decorator(func):
107 fullname = f'{func.__module__}.{func.__qualname__}'
108 parent, _, name = fullname.rpartition('.')
109 if name == '__init__':
111 warning_msg = f'{fullname} is deprecated{version}.{preferred}'
112 @functools.wraps(func)
113 def deprecated_wrapper(*args, **kwargs):
114 warnings.warn(warning_msg, DeprecationWarning, 2)
115 return func(*args, **kwargs)
116 # Get func's docstring without any trailing newline or empty lines.
117 func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
118 match = re.search(r'\n([ \t]+)\S', func_doc)
119 indent = '' if match is None else match.group(1)
120 warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent} {warning_msg}'
121 # Make the deprecation notice the second "paragraph" of the
122 # docstring if possible. Otherwise append it.
123 docstring, count = re.subn(
124 rf'\n[ \t]*\n{indent}',
125 f'{warning_doc}\n\n{indent}',
130 docstring = f'{func_doc.lstrip()}{warning_doc}'
131 deprecated_wrapper.__doc__ = docstring
132 return deprecated_wrapper
133 return deprecated_decorator
135 @dataclasses.dataclass
136 class _BaseDirectorySpec:
137 """Parse base directories
139 A _BaseDirectorySpec defines all the environment variable keys and defaults
140 related to a set of base directories (cache, config, state, etc.). It
141 provides pure methods to parse environment settings into valid paths.
145 xdg_home_default: PurePath
146 xdg_dirs_key: Optional[str] = None
147 xdg_dirs_default: str = ''
150 def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
152 path = Path(env[key])
153 except (KeyError, ValueError):
156 ok = path.is_absolute()
157 return path if ok else None
160 def _iter_abspaths(value: str) -> Iterator[Path]:
161 for path_s in value.split(':'):
163 if path.is_absolute():
166 def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
167 return self._iter_abspaths(env.get(self.systemd_key, ''))
169 def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
170 yield self.xdg_home(env, subdir)
171 if self.xdg_dirs_key is not None:
172 for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
175 def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
177 self._abspath_from_env(env, self.xdg_home_key)
178 or self.xdg_home_default_path(env)
181 def xdg_home_default_path(self, env: Mapping[str, str]) -> Path:
182 return (self._abspath_from_env(env, 'HOME') or Path.home()) / self.xdg_home_default
184 def xdg_home_is_customized(self, env: Mapping[str, str]) -> bool:
185 xdg_home = self._abspath_from_env(env, self.xdg_home_key)
186 return xdg_home is not None and xdg_home != self.xdg_home_default_path(env)
189 class _BaseDirectorySpecs(enum.Enum):
190 """Base directory specifications
192 This enum provides easy access to the standard base directory settings.
194 CACHE = _BaseDirectorySpec(
199 CONFIG = _BaseDirectorySpec(
200 'CONFIGURATION_DIRECTORY',
206 STATE = _BaseDirectorySpec(
209 PurePath('.local', 'state'),
213 class _BaseDirectories:
214 """Resolve paths from a base directory spec
216 Given a _BaseDirectorySpec, this class provides stateful methods to find
217 existing files and return the most-preferred directory for writing.
219 _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
223 spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
224 env: Mapping[str, str]=os.environ,
225 xdg_subdir: Union[os.PathLike, str]='arvados',
227 if isinstance(spec, str):
228 spec = _BaseDirectorySpecs[spec].value
229 elif isinstance(spec, _BaseDirectorySpecs):
233 self._xdg_subdir = PurePath(xdg_subdir)
235 def search(self, name: str) -> Iterator[Path]:
237 for search_path in itertools.chain(
238 self._spec.iter_systemd(self._env),
239 self._spec.iter_xdg(self._env, self._xdg_subdir),
241 path = search_path / name
245 # The rest of this function is dedicated to warning the user if they
246 # have a custom XDG_*_HOME value that prevented the search from
247 # succeeding. This should be rare.
248 if any_found or not self._spec.xdg_home_is_customized(self._env):
250 default_home = self._spec.xdg_home_default_path(self._env)
251 default_path = Path(self._xdg_subdir / name)
252 if not (default_home / default_path).exists():
254 if self._spec.xdg_dirs_key is None:
255 suggest_key = self._spec.xdg_home_key
256 suggest_value = default_home
258 suggest_key = self._spec.xdg_dirs_key
259 cur_value = self._env.get(suggest_key, '')
260 value_sep = ':' if cur_value else ''
261 suggest_value = f'{cur_value}{value_sep}{default_home}'
264 %s was not found under your configured $%s (%s), \
265 but does exist at the default location (%s) - \
266 consider running this program with the environment setting %s=%s\
269 self._spec.xdg_home_key,
270 self._spec.xdg_home(self._env, ''),
273 shlex.quote(suggest_value),
278 subdir: Union[str, os.PathLike]=PurePath(),
281 for path in self._spec.iter_systemd(self._env):
283 mode = path.stat().st_mode
286 if (mode & self._STORE_MODE) == self._STORE_MODE:
289 path = self._spec.xdg_home(self._env, self._xdg_subdir)
291 path.mkdir(parents=True, exist_ok=True, mode=mode)
295 def is_hex(s: str, *length_args: int) -> bool:
296 """Indicate whether a string is a hexadecimal number
298 This method returns true if all characters in the string are hexadecimal
299 digits. It is case-insensitive.
301 You can also pass optional length arguments to check that the string has
302 the expected number of digits. If you pass one integer, the string must
303 have that length exactly, otherwise the method returns False. If you
304 pass two integers, the string's length must fall within that minimum and
305 maximum (inclusive), otherwise the method returns False.
309 * s: str --- The string to check
311 * length_args: int --- Optional length limit(s) for the string to check
313 num_length_args = len(length_args)
314 if num_length_args > 2:
315 raise arvados.errors.ArgumentError(
316 "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
317 elif num_length_args == 2:
318 good_len = (length_args[0] <= len(s) <= length_args[1])
319 elif num_length_args == 1:
320 good_len = (len(s) == length_args[0])
323 return bool(good_len and HEX_RE.match(s))
326 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
327 order_key: str="created_at",
329 ascending: bool=True,
330 key_fields: Container[str]=('uuid',),
332 ) -> Iterator[Dict[str, Any]]:
333 """Iterate all Arvados resources from an API list call
335 This method takes a method that represents an Arvados API list call, and
336 iterates the objects returned by the API server. It can make multiple API
337 calls to retrieve and iterate all objects available from the API server.
341 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
342 function that wraps an Arvados API method that returns a list of
343 objects. If you have an Arvados API client named `arv`, examples
344 include `arv.collections().list` and `arv.groups().contents`. Note
345 that you should pass the function *without* calling it.
347 * order_key: str --- The name of the primary object field that objects
348 should be sorted by. This name is used to build an `order` argument
349 for `fn`. Default `'created_at'`.
351 * num_retries: int --- This argument is passed through to
352 `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
353 that method's docstring for details. Default 0 (meaning API calls will
354 use the `num_retries` value set when the Arvados API client was
357 * ascending: bool --- Used to build an `order` argument for `fn`. If True,
358 all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
359 fields will be sorted in `'desc'` (descending) order.
361 * key_fields: Container[str] --- One or two fields that constitute
362 a unique key for returned items. Normally this should be the
363 default value `('uuid',)`, unless `fn` returns
364 computed_permissions records, in which case it should be
365 `('user_uuid', 'target_uuid')`. If two fields are given, one of
366 them must be equal to `order_key`.
368 Additional keyword arguments will be passed directly to `fn` for each API
369 call. Note that this function sets `count`, `limit`, and `order` as part of
373 tiebreak_keys = set(key_fields) - {order_key}
374 if len(tiebreak_keys) == 0:
375 tiebreak_key = 'uuid'
376 elif len(tiebreak_keys) == 1:
377 tiebreak_key = tiebreak_keys.pop()
379 raise arvados.errors.ArgumentError(
380 "key_fields can have at most one entry that is not order_key")
383 kwargs["limit"] = pagesize
384 kwargs["count"] = 'none'
385 asc = "asc" if ascending else "desc"
386 kwargs["order"] = [f"{order_key} {asc}", f"{tiebreak_key} {asc}"]
387 other_filters = kwargs.get("filters", [])
389 if 'select' in kwargs:
390 kwargs['select'] = list({*kwargs['select'], *key_fields, order_key})
394 expect_full_page = True
395 key_getter = operator.itemgetter(*key_fields)
396 seen_prevpage = set()
397 seen_thispage = set()
399 prev_page_all_same_order_key = False
402 kwargs["filters"] = nextpage+other_filters
403 items = fn(**kwargs).execute(num_retries=num_retries)
405 if len(items["items"]) == 0:
406 if prev_page_all_same_order_key:
407 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
408 prev_page_all_same_order_key = False
413 seen_prevpage = seen_thispage
414 seen_thispage = set()
416 for i in items["items"]:
417 # In cases where there's more than one record with the
418 # same order key, the result could include records we
419 # already saw in the last page. Skip them.
420 seen_key = key_getter(i)
421 if seen_key in seen_prevpage:
423 seen_thispage.add(seen_key)
426 firstitem = items["items"][0]
427 lastitem = items["items"][-1]
429 if firstitem[order_key] == lastitem[order_key]:
430 # Got a page where every item has the same order key.
431 # Switch to using tiebreak key for paging.
432 nextpage = [[order_key, "=", lastitem[order_key]], [tiebreak_key, ">" if ascending else "<", lastitem[tiebreak_key]]]
433 prev_page_all_same_order_key = True
435 # Start from the last order key seen, but skip the last
436 # known uuid to avoid retrieving the same row twice. If
437 # there are multiple rows with the same order key it is
438 # still likely we'll end up retrieving duplicate rows.
439 # That's handled by tracking the "seen" rows for each page
440 # so they can be skipped if they show up on the next page.
441 nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]]]
442 if tiebreak_key == "uuid":
443 nextpage += [[tiebreak_key, "!=", lastitem[tiebreak_key]]]
444 prev_page_all_same_order_key = False
446 def iter_computed_permissions(
447 fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
448 order_key: str='user_uuid',
450 ascending: bool=True,
451 key_fields: Container[str]=('user_uuid', 'target_uuid'),
453 ) -> Iterator[Dict[str, Any]]:
454 """Iterate all `computed_permission` resources
456 This method is the same as `keyset_list_all`, except that its
457 default arguments are suitable for the computed_permissions API.
461 * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] ---
462 see `keyset_list_all`. Typically this is an instance of
463 `arvados.api_resources.ComputedPermissions.list`. Given an
464 Arvados API client named `arv`, typical usage is
465 `iter_computed_permissions(arv.computed_permissions().list)`.
467 * order_key: str --- see `keyset_list_all`. Default
470 * num_retries: int --- see `keyset_list_all`.
472 * ascending: bool --- see `keyset_list_all`.
474 * key_fields: Container[str] --- see `keyset_list_all`. Default
475 `('user_uuid', 'target_uuid')`.
478 return keyset_list_all(
481 num_retries=num_retries,
483 key_fields=key_fields,
486 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
487 """Return the path of the best available source of CA certificates
489 This function checks various known paths that provide trusted CA
490 certificates, and returns the first one that exists. It checks:
492 * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
493 * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
494 * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
496 * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
499 If none of these paths exist, this function returns the value of `fallback`.
503 * fallback: T --- The value to return if none of the known paths exist.
504 The default value is the certificate store of Mozilla's trusted CAs
505 included with the Python [certifi][] package.
507 [certifi]: https://pypi.org/project/certifi/
509 for ca_certs_path in [
510 # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
511 # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
512 os.environ.get('SSL_CERT_FILE'),
514 '/etc/arvados/ca-certificates.crt',
516 '/etc/ssl/certs/ca-certificates.crt',
518 '/etc/pki/tls/certs/ca-bundle.crt',
520 if ca_certs_path and os.path.exists(ca_certs_path):
524 def new_request_id() -> str:
525 """Return a random request ID
527 This function generates and returns a random string suitable for use as a
528 `X-Request-Id` header value in the Arvados API.
531 # 2**104 > 36**20 > 2**103
532 n = random.getrandbits(104)
536 rid += chr(c+ord('0'))
538 rid += chr(c+ord('a')-10)
542 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
543 """Return an Arvados cluster's configuration, with caching
545 This function gets and returns the Arvados configuration from the API
546 server. It caches the result on the client object and reuses it on any
551 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
552 object to use to retrieve and cache the Arvados cluster configuration.
554 if not svc._rootDesc.get('resources').get('configs', False):
555 # Old API server version, no config export endpoint
557 if not hasattr(svc, '_cached_config'):
558 svc._cached_config = svc.configs().get().execute()
559 return svc._cached_config
561 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
562 """Return an Arvados cluster's vocabulary, with caching
564 This function gets and returns the Arvados vocabulary from the API
565 server. It caches the result on the client object and reuses it on any
568 .. HINT:: Low-level method
569 This is a relatively low-level wrapper around the Arvados API. Most
570 users will prefer to use `arvados.vocabulary.load_vocabulary`.
574 * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
575 object to use to retrieve and cache the Arvados cluster vocabulary.
577 if not svc._rootDesc.get('resources').get('vocabularies', False):
578 # Old API server version, no vocabulary export endpoint
580 if not hasattr(svc, '_cached_vocabulary'):
581 svc._cached_vocabulary = svc.vocabularies().get().execute()
582 return svc._cached_vocabulary
584 def trim_name(collectionname: str) -> str:
585 """Limit the length of a name to fit within Arvados API limits
587 This function ensures that a string is short enough to use as an object
588 name in the Arvados API, leaving room for text that may be added by the
589 `ensure_unique_name` argument. If the source name is short enough, it is
590 returned unchanged. Otherwise, this function returns a string with excess
591 characters removed from the middle of the source string and replaced with
596 * collectionname: str --- The desired source name
598 max_name_len = 254 - 28
600 if len(collectionname) > max_name_len:
601 over = len(collectionname) - max_name_len
602 split = int(max_name_len/2)
603 collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
605 return collectionname