Merge branch '21762-flaky-search-test'
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados utilities
5
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
8 """
9
10 import dataclasses
11 import enum
12 import errno
13 import fcntl
14 import functools
15 import hashlib
16 import httplib2
17 import itertools
18 import logging
19 import os
20 import random
21 import re
22 import shlex
23 import stat
24 import subprocess
25 import sys
26 import warnings
27
28 import arvados.errors
29
30 from pathlib import Path, PurePath
31 from typing import (
32     Any,
33     Callable,
34     Dict,
35     Iterator,
36     Mapping,
37     Optional,
38     TypeVar,
39     Union,
40 )
41
42 T = TypeVar('T')
43
44 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
45 """Regular expression to match a hexadecimal string (case-insensitive)"""
46 CR_UNCOMMITTED = 'Uncommitted'
47 """Constant `state` value for uncommited container requests"""
48 CR_COMMITTED = 'Committed'
49 """Constant `state` value for committed container requests"""
50 CR_FINAL = 'Final'
51 """Constant `state` value for finalized container requests"""
52
53 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
54 """Regular expression to match any Keep block locator"""
55 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
56 """Regular expression to match any Keep block locator with an access token hint"""
57 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
58 """Regular expression to match any collection portable data hash"""
59 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
60 """Regular expression to match an Arvados collection manifest text"""
61 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
62 """Regular expression to match a file path from a collection identified by portable data hash"""
63 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
64 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
65
66 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
67 """Regular expression to match any Arvados object UUID"""
68 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
69 """Regular expression to match any Arvados collection UUID"""
70 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
71 """Regular expression to match any Arvados container UUID"""
72 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
73 """Regular expression to match any Arvados group UUID"""
74 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
75 """Regular expression to match any Arvados link UUID"""
76 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
77 """Regular expression to match any Arvados user UUID"""
78
79 logger = logging.getLogger('arvados')
80
81 def _deprecated(version=None, preferred=None):
82     """Mark a callable as deprecated in the SDK
83
84     This will wrap the callable to emit as a DeprecationWarning
85     and add a deprecation notice to its docstring.
86
87     If the following arguments are given, they'll be included in the
88     notices:
89
90     * preferred: str | None --- The name of an alternative that users should
91       use instead.
92
93     * version: str | None --- The version of Arvados when the callable is
94       scheduled to be removed.
95     """
96     if version is None:
97         version = ''
98     else:
99         version = f' and scheduled to be removed in Arvados {version}'
100     if preferred is None:
101         preferred = ''
102     else:
103         preferred = f' Prefer {preferred} instead.'
104     def deprecated_decorator(func):
105         fullname = f'{func.__module__}.{func.__qualname__}'
106         parent, _, name = fullname.rpartition('.')
107         if name == '__init__':
108             fullname = parent
109         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
110         @functools.wraps(func)
111         def deprecated_wrapper(*args, **kwargs):
112             warnings.warn(warning_msg, DeprecationWarning, 2)
113             return func(*args, **kwargs)
114         # Get func's docstring without any trailing newline or empty lines.
115         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
116         match = re.search(r'\n([ \t]+)\S', func_doc)
117         indent = '' if match is None else match.group(1)
118         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
119         # Make the deprecation notice the second "paragraph" of the
120         # docstring if possible. Otherwise append it.
121         docstring, count = re.subn(
122             rf'\n[ \t]*\n{indent}',
123             f'{warning_doc}\n\n{indent}',
124             func_doc,
125             count=1,
126         )
127         if not count:
128             docstring = f'{func_doc.lstrip()}{warning_doc}'
129         deprecated_wrapper.__doc__ = docstring
130         return deprecated_wrapper
131     return deprecated_decorator
132
133 @dataclasses.dataclass
134 class _BaseDirectorySpec:
135     """Parse base directories
136
137     A _BaseDirectorySpec defines all the environment variable keys and defaults
138     related to a set of base directories (cache, config, state, etc.). It
139     provides pure methods to parse environment settings into valid paths.
140     """
141     systemd_key: str
142     xdg_home_key: str
143     xdg_home_default: PurePath
144     xdg_dirs_key: Optional[str] = None
145     xdg_dirs_default: str = ''
146
147     @staticmethod
148     def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
149         try:
150             path = Path(env[key])
151         except (KeyError, ValueError):
152             ok = False
153         else:
154             ok = path.is_absolute()
155         return path if ok else None
156
157     @staticmethod
158     def _iter_abspaths(value: str) -> Iterator[Path]:
159         for path_s in value.split(':'):
160             path = Path(path_s)
161             if path.is_absolute():
162                 yield path
163
164     def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
165         return self._iter_abspaths(env.get(self.systemd_key, ''))
166
167     def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
168         yield self.xdg_home(env, subdir)
169         if self.xdg_dirs_key is not None:
170             for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
171                 yield path / subdir
172
173     def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
174         return (
175             self._abspath_from_env(env, self.xdg_home_key)
176             or self.xdg_home_default_path(env)
177         ) / subdir
178
179     def xdg_home_default_path(self, env: Mapping[str, str]) -> Path:
180         return (self._abspath_from_env(env, 'HOME') or Path.home()) / self.xdg_home_default
181
182     def xdg_home_is_customized(self, env: Mapping[str, str]) -> bool:
183         xdg_home = self._abspath_from_env(env, self.xdg_home_key)
184         return xdg_home is not None and xdg_home != self.xdg_home_default_path(env)
185
186
187 class _BaseDirectorySpecs(enum.Enum):
188     """Base directory specifications
189
190     This enum provides easy access to the standard base directory settings.
191     """
192     CACHE = _BaseDirectorySpec(
193         'CACHE_DIRECTORY',
194         'XDG_CACHE_HOME',
195         PurePath('.cache'),
196     )
197     CONFIG = _BaseDirectorySpec(
198         'CONFIGURATION_DIRECTORY',
199         'XDG_CONFIG_HOME',
200         PurePath('.config'),
201         'XDG_CONFIG_DIRS',
202         '/etc/xdg',
203     )
204     STATE = _BaseDirectorySpec(
205         'STATE_DIRECTORY',
206         'XDG_STATE_HOME',
207         PurePath('.local', 'state'),
208     )
209
210
211 class _BaseDirectories:
212     """Resolve paths from a base directory spec
213
214     Given a _BaseDirectorySpec, this class provides stateful methods to find
215     existing files and return the most-preferred directory for writing.
216     """
217     _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
218
219     def __init__(
220             self,
221             spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
222             env: Mapping[str, str]=os.environ,
223             xdg_subdir: Union[os.PathLike, str]='arvados',
224     ) -> None:
225         if isinstance(spec, str):
226             spec = _BaseDirectorySpecs[spec].value
227         elif isinstance(spec, _BaseDirectorySpecs):
228             spec = spec.value
229         self._spec = spec
230         self._env = env
231         self._xdg_subdir = PurePath(xdg_subdir)
232
233     def search(self, name: str) -> Iterator[Path]:
234         any_found = False
235         for search_path in itertools.chain(
236                 self._spec.iter_systemd(self._env),
237                 self._spec.iter_xdg(self._env, self._xdg_subdir),
238         ):
239             path = search_path / name
240             if path.exists():
241                 yield path
242                 any_found = True
243         # The rest of this function is dedicated to warning the user if they
244         # have a custom XDG_*_HOME value that prevented the search from
245         # succeeding. This should be rare.
246         if any_found or not self._spec.xdg_home_is_customized(self._env):
247             return
248         default_home = self._spec.xdg_home_default_path(self._env)
249         default_path = Path(self._xdg_subdir / name)
250         if not (default_home / default_path).exists():
251             return
252         if self._spec.xdg_dirs_key is None:
253             suggest_key = self._spec.xdg_home_key
254             suggest_value = default_home
255         else:
256             suggest_key = self._spec.xdg_dirs_key
257             cur_value = self._env.get(suggest_key, '')
258             value_sep = ':' if cur_value else ''
259             suggest_value = f'{cur_value}{value_sep}{default_home}'
260         logger.warning(
261             "\
262 %s was not found under your configured $%s (%s), \
263 but does exist at the default location (%s) - \
264 consider running this program with the environment setting %s=%s\
265 ",
266             default_path,
267             self._spec.xdg_home_key,
268             self._spec.xdg_home(self._env, ''),
269             default_home,
270             suggest_key,
271             shlex.quote(suggest_value),
272         )
273
274     def storage_path(
275             self,
276             subdir: Union[str, os.PathLike]=PurePath(),
277             mode: int=0o700,
278     ) -> Path:
279         for path in self._spec.iter_systemd(self._env):
280             try:
281                 mode = path.stat().st_mode
282             except OSError:
283                 continue
284             if (mode & self._STORE_MODE) == self._STORE_MODE:
285                 break
286         else:
287             path = self._spec.xdg_home(self._env, self._xdg_subdir)
288         path /= subdir
289         path.mkdir(parents=True, exist_ok=True, mode=mode)
290         return path
291
292
293 def is_hex(s: str, *length_args: int) -> bool:
294     """Indicate whether a string is a hexadecimal number
295
296     This method returns true if all characters in the string are hexadecimal
297     digits. It is case-insensitive.
298
299     You can also pass optional length arguments to check that the string has
300     the expected number of digits. If you pass one integer, the string must
301     have that length exactly, otherwise the method returns False. If you
302     pass two integers, the string's length must fall within that minimum and
303     maximum (inclusive), otherwise the method returns False.
304
305     Arguments:
306
307     * s: str --- The string to check
308
309     * length_args: int --- Optional length limit(s) for the string to check
310     """
311     num_length_args = len(length_args)
312     if num_length_args > 2:
313         raise arvados.errors.ArgumentError(
314             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
315     elif num_length_args == 2:
316         good_len = (length_args[0] <= len(s) <= length_args[1])
317     elif num_length_args == 1:
318         good_len = (len(s) == length_args[0])
319     else:
320         good_len = True
321     return bool(good_len and HEX_RE.match(s))
322
323 def keyset_list_all(
324         fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
325         order_key: str="created_at",
326         num_retries: int=0,
327         ascending: bool=True,
328         **kwargs: Any,
329 ) -> Iterator[Dict[str, Any]]:
330     """Iterate all Arvados resources from an API list call
331
332     This method takes a method that represents an Arvados API list call, and
333     iterates the objects returned by the API server. It can make multiple API
334     calls to retrieve and iterate all objects available from the API server.
335
336     Arguments:
337
338     * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
339       function that wraps an Arvados API method that returns a list of
340       objects. If you have an Arvados API client named `arv`, examples
341       include `arv.collections().list` and `arv.groups().contents`. Note
342       that you should pass the function *without* calling it.
343
344     * order_key: str --- The name of the primary object field that objects
345       should be sorted by. This name is used to build an `order` argument
346       for `fn`. Default `'created_at'`.
347
348     * num_retries: int --- This argument is passed through to
349       `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
350       that method's docstring for details. Default 0 (meaning API calls will
351       use the `num_retries` value set when the Arvados API client was
352       constructed).
353
354     * ascending: bool --- Used to build an `order` argument for `fn`. If True,
355       all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
356       fields will be sorted in `'desc'` (descending) order.
357
358     Additional keyword arguments will be passed directly to `fn` for each API
359     call. Note that this function sets `count`, `limit`, and `order` as part of
360     its work.
361     """
362     pagesize = 1000
363     kwargs["limit"] = pagesize
364     kwargs["count"] = 'none'
365     asc = "asc" if ascending else "desc"
366     kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
367     other_filters = kwargs.get("filters", [])
368
369     try:
370         select = set(kwargs['select'])
371     except KeyError:
372         pass
373     else:
374         select.add(order_key)
375         select.add('uuid')
376         kwargs['select'] = list(select)
377
378     nextpage = []
379     tot = 0
380     expect_full_page = True
381     seen_prevpage = set()
382     seen_thispage = set()
383     lastitem = None
384     prev_page_all_same_order_key = False
385
386     while True:
387         kwargs["filters"] = nextpage+other_filters
388         items = fn(**kwargs).execute(num_retries=num_retries)
389
390         if len(items["items"]) == 0:
391             if prev_page_all_same_order_key:
392                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
393                 prev_page_all_same_order_key = False
394                 continue
395             else:
396                 return
397
398         seen_prevpage = seen_thispage
399         seen_thispage = set()
400
401         for i in items["items"]:
402             # In cases where there's more than one record with the
403             # same order key, the result could include records we
404             # already saw in the last page.  Skip them.
405             if i["uuid"] in seen_prevpage:
406                 continue
407             seen_thispage.add(i["uuid"])
408             yield i
409
410         firstitem = items["items"][0]
411         lastitem = items["items"][-1]
412
413         if firstitem[order_key] == lastitem[order_key]:
414             # Got a page where every item has the same order key.
415             # Switch to using uuid for paging.
416             nextpage = [[order_key, "=", lastitem[order_key]], ["uuid", ">" if ascending else "<", lastitem["uuid"]]]
417             prev_page_all_same_order_key = True
418         else:
419             # Start from the last order key seen, but skip the last
420             # known uuid to avoid retrieving the same row twice.  If
421             # there are multiple rows with the same order key it is
422             # still likely we'll end up retrieving duplicate rows.
423             # That's handled by tracking the "seen" rows for each page
424             # so they can be skipped if they show up on the next page.
425             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
426             prev_page_all_same_order_key = False
427
428 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
429     """Return the path of the best available source of CA certificates
430
431     This function checks various known paths that provide trusted CA
432     certificates, and returns the first one that exists. It checks:
433
434     * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
435     * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
436     * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
437       distributions
438     * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
439       distributions
440
441     If none of these paths exist, this function returns the value of `fallback`.
442
443     Arguments:
444
445     * fallback: T --- The value to return if none of the known paths exist.
446       The default value is the certificate store of Mozilla's trusted CAs
447       included with the Python [certifi][] package.
448
449     [certifi]: https://pypi.org/project/certifi/
450     """
451     for ca_certs_path in [
452         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
453         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
454         os.environ.get('SSL_CERT_FILE'),
455         # Arvados specific:
456         '/etc/arvados/ca-certificates.crt',
457         # Debian:
458         '/etc/ssl/certs/ca-certificates.crt',
459         # Red Hat:
460         '/etc/pki/tls/certs/ca-bundle.crt',
461         ]:
462         if ca_certs_path and os.path.exists(ca_certs_path):
463             return ca_certs_path
464     return fallback
465
466 def new_request_id() -> str:
467     """Return a random request ID
468
469     This function generates and returns a random string suitable for use as a
470     `X-Request-Id` header value in the Arvados API.
471     """
472     rid = "req-"
473     # 2**104 > 36**20 > 2**103
474     n = random.getrandbits(104)
475     for _ in range(20):
476         c = n % 36
477         if c < 10:
478             rid += chr(c+ord('0'))
479         else:
480             rid += chr(c+ord('a')-10)
481         n = n // 36
482     return rid
483
484 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
485     """Return an Arvados cluster's configuration, with caching
486
487     This function gets and returns the Arvados configuration from the API
488     server. It caches the result on the client object and reuses it on any
489     future calls.
490
491     Arguments:
492
493     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
494       object to use to retrieve and cache the Arvados cluster configuration.
495     """
496     if not svc._rootDesc.get('resources').get('configs', False):
497         # Old API server version, no config export endpoint
498         return {}
499     if not hasattr(svc, '_cached_config'):
500         svc._cached_config = svc.configs().get().execute()
501     return svc._cached_config
502
503 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
504     """Return an Arvados cluster's vocabulary, with caching
505
506     This function gets and returns the Arvados vocabulary from the API
507     server. It caches the result on the client object and reuses it on any
508     future calls.
509
510     .. HINT:: Low-level method
511        This is a relatively low-level wrapper around the Arvados API. Most
512        users will prefer to use `arvados.vocabulary.load_vocabulary`.
513
514     Arguments:
515
516     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
517       object to use to retrieve and cache the Arvados cluster vocabulary.
518     """
519     if not svc._rootDesc.get('resources').get('vocabularies', False):
520         # Old API server version, no vocabulary export endpoint
521         return {}
522     if not hasattr(svc, '_cached_vocabulary'):
523         svc._cached_vocabulary = svc.vocabularies().get().execute()
524     return svc._cached_vocabulary
525
526 def trim_name(collectionname: str) -> str:
527     """Limit the length of a name to fit within Arvados API limits
528
529     This function ensures that a string is short enough to use as an object
530     name in the Arvados API, leaving room for text that may be added by the
531     `ensure_unique_name` argument. If the source name is short enough, it is
532     returned unchanged. Otherwise, this function returns a string with excess
533     characters removed from the middle of the source string and replaced with
534     an ellipsis.
535
536     Arguments:
537
538     * collectionname: str --- The desired source name
539     """
540     max_name_len = 254 - 28
541
542     if len(collectionname) > max_name_len:
543         over = len(collectionname) - max_name_len
544         split = int(max_name_len/2)
545         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
546
547     return collectionname