21935: Rename safeapi.ThreadSafeApiCache to api.ThreadSafeAPIClient
[arvados.git] / sdk / python / arvados / util.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados utilities
5
6 This module provides functions and constants that are useful across a variety
7 of Arvados resource types, or extend the Arvados API client (see `arvados.api`).
8 """
9
10 import dataclasses
11 import enum
12 import errno
13 import fcntl
14 import functools
15 import hashlib
16 import httplib2
17 import itertools
18 import logging
19 import operator
20 import os
21 import random
22 import re
23 import shlex
24 import stat
25 import subprocess
26 import sys
27 import warnings
28
29 import arvados.errors
30
31 from pathlib import Path, PurePath
32 from typing import (
33     Any,
34     Callable,
35     Container,
36     Dict,
37     Iterator,
38     Mapping,
39     Optional,
40     TypeVar,
41     Union,
42 )
43
44 T = TypeVar('T')
45
46 HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
47 """Regular expression to match a hexadecimal string (case-insensitive)"""
48 CR_UNCOMMITTED = 'Uncommitted'
49 """Constant `state` value for uncommited container requests"""
50 CR_COMMITTED = 'Committed'
51 """Constant `state` value for committed container requests"""
52 CR_FINAL = 'Final'
53 """Constant `state` value for finalized container requests"""
54
55 keep_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*')
56 """Regular expression to match any Keep block locator"""
57 signed_locator_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+(\+\S+)*\+A\S+(\+\S+)*')
58 """Regular expression to match any Keep block locator with an access token hint"""
59 portable_data_hash_pattern = re.compile(r'[0-9a-f]{32}\+[0-9]+')
60 """Regular expression to match any collection portable data hash"""
61 manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+[0-9]+)(\+\S+)*)+( +[0-9]+:[0-9]+:\S+)+$)+', flags=re.MULTILINE)
62 """Regular expression to match an Arvados collection manifest text"""
63 keep_file_locator_pattern = re.compile(r'([0-9a-f]{32}\+[0-9]+)/(.*)')
64 """Regular expression to match a file path from a collection identified by portable data hash"""
65 keepuri_pattern = re.compile(r'keep:([0-9a-f]{32}\+[0-9]+)/(.*)')
66 """Regular expression to match a `keep:` URI with a collection identified by portable data hash"""
67
68 uuid_pattern = re.compile(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}')
69 """Regular expression to match any Arvados object UUID"""
70 collection_uuid_pattern = re.compile(r'[a-z0-9]{5}-4zz18-[a-z0-9]{15}')
71 """Regular expression to match any Arvados collection UUID"""
72 container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
73 """Regular expression to match any Arvados container UUID"""
74 group_uuid_pattern = re.compile(r'[a-z0-9]{5}-j7d0g-[a-z0-9]{15}')
75 """Regular expression to match any Arvados group UUID"""
76 link_uuid_pattern = re.compile(r'[a-z0-9]{5}-o0j2j-[a-z0-9]{15}')
77 """Regular expression to match any Arvados link UUID"""
78 user_uuid_pattern = re.compile(r'[a-z0-9]{5}-tpzed-[a-z0-9]{15}')
79 """Regular expression to match any Arvados user UUID"""
80
81 logger = logging.getLogger('arvados')
82
83 def _deprecated(version=None, preferred=None):
84     """Mark a callable as deprecated in the SDK
85
86     This will wrap the callable to emit as a DeprecationWarning
87     and add a deprecation notice to its docstring.
88
89     If the following arguments are given, they'll be included in the
90     notices:
91
92     * preferred: str | None --- The name of an alternative that users should
93       use instead.
94
95     * version: str | None --- The version of Arvados when the callable is
96       scheduled to be removed.
97     """
98     if version is None:
99         version = ''
100     else:
101         version = f' and scheduled to be removed in Arvados {version}'
102     if preferred is None:
103         preferred = ''
104     else:
105         preferred = f' Prefer {preferred} instead.'
106     def deprecated_decorator(func):
107         fullname = f'{func.__module__}.{func.__qualname__}'
108         parent, _, name = fullname.rpartition('.')
109         if name == '__init__':
110             fullname = parent
111         warning_msg = f'{fullname} is deprecated{version}.{preferred}'
112         @functools.wraps(func)
113         def deprecated_wrapper(*args, **kwargs):
114             warnings.warn(warning_msg, DeprecationWarning, 2)
115             return func(*args, **kwargs)
116         # Get func's docstring without any trailing newline or empty lines.
117         func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
118         match = re.search(r'\n([ \t]+)\S', func_doc)
119         indent = '' if match is None else match.group(1)
120         warning_doc = f'\n\n{indent}.. WARNING:: Deprecated\n{indent}   {warning_msg}'
121         # Make the deprecation notice the second "paragraph" of the
122         # docstring if possible. Otherwise append it.
123         docstring, count = re.subn(
124             rf'\n[ \t]*\n{indent}',
125             f'{warning_doc}\n\n{indent}',
126             func_doc,
127             count=1,
128         )
129         if not count:
130             docstring = f'{func_doc.lstrip()}{warning_doc}'
131         deprecated_wrapper.__doc__ = docstring
132         return deprecated_wrapper
133     return deprecated_decorator
134
135 @dataclasses.dataclass
136 class _BaseDirectorySpec:
137     """Parse base directories
138
139     A _BaseDirectorySpec defines all the environment variable keys and defaults
140     related to a set of base directories (cache, config, state, etc.). It
141     provides pure methods to parse environment settings into valid paths.
142     """
143     systemd_key: str
144     xdg_home_key: str
145     xdg_home_default: PurePath
146     xdg_dirs_key: Optional[str] = None
147     xdg_dirs_default: str = ''
148
149     @staticmethod
150     def _abspath_from_env(env: Mapping[str, str], key: str) -> Optional[Path]:
151         try:
152             path = Path(env[key])
153         except (KeyError, ValueError):
154             ok = False
155         else:
156             ok = path.is_absolute()
157         return path if ok else None
158
159     @staticmethod
160     def _iter_abspaths(value: str) -> Iterator[Path]:
161         for path_s in value.split(':'):
162             path = Path(path_s)
163             if path.is_absolute():
164                 yield path
165
166     def iter_systemd(self, env: Mapping[str, str]) -> Iterator[Path]:
167         return self._iter_abspaths(env.get(self.systemd_key, ''))
168
169     def iter_xdg(self, env: Mapping[str, str], subdir: PurePath) -> Iterator[Path]:
170         yield self.xdg_home(env, subdir)
171         if self.xdg_dirs_key is not None:
172             for path in self._iter_abspaths(env.get(self.xdg_dirs_key) or self.xdg_dirs_default):
173                 yield path / subdir
174
175     def xdg_home(self, env: Mapping[str, str], subdir: PurePath) -> Path:
176         return (
177             self._abspath_from_env(env, self.xdg_home_key)
178             or self.xdg_home_default_path(env)
179         ) / subdir
180
181     def xdg_home_default_path(self, env: Mapping[str, str]) -> Path:
182         return (self._abspath_from_env(env, 'HOME') or Path.home()) / self.xdg_home_default
183
184     def xdg_home_is_customized(self, env: Mapping[str, str]) -> bool:
185         xdg_home = self._abspath_from_env(env, self.xdg_home_key)
186         return xdg_home is not None and xdg_home != self.xdg_home_default_path(env)
187
188
189 class _BaseDirectorySpecs(enum.Enum):
190     """Base directory specifications
191
192     This enum provides easy access to the standard base directory settings.
193     """
194     CACHE = _BaseDirectorySpec(
195         'CACHE_DIRECTORY',
196         'XDG_CACHE_HOME',
197         PurePath('.cache'),
198     )
199     CONFIG = _BaseDirectorySpec(
200         'CONFIGURATION_DIRECTORY',
201         'XDG_CONFIG_HOME',
202         PurePath('.config'),
203         'XDG_CONFIG_DIRS',
204         '/etc/xdg',
205     )
206     STATE = _BaseDirectorySpec(
207         'STATE_DIRECTORY',
208         'XDG_STATE_HOME',
209         PurePath('.local', 'state'),
210     )
211
212
213 class _BaseDirectories:
214     """Resolve paths from a base directory spec
215
216     Given a _BaseDirectorySpec, this class provides stateful methods to find
217     existing files and return the most-preferred directory for writing.
218     """
219     _STORE_MODE = stat.S_IFDIR | stat.S_IWUSR
220
221     def __init__(
222             self,
223             spec: Union[_BaseDirectorySpec, _BaseDirectorySpecs, str],
224             env: Mapping[str, str]=os.environ,
225             xdg_subdir: Union[os.PathLike, str]='arvados',
226     ) -> None:
227         if isinstance(spec, str):
228             spec = _BaseDirectorySpecs[spec].value
229         elif isinstance(spec, _BaseDirectorySpecs):
230             spec = spec.value
231         self._spec = spec
232         self._env = env
233         self._xdg_subdir = PurePath(xdg_subdir)
234
235     def search(self, name: str) -> Iterator[Path]:
236         any_found = False
237         for search_path in itertools.chain(
238                 self._spec.iter_systemd(self._env),
239                 self._spec.iter_xdg(self._env, self._xdg_subdir),
240         ):
241             path = search_path / name
242             if path.exists():
243                 yield path
244                 any_found = True
245         # The rest of this function is dedicated to warning the user if they
246         # have a custom XDG_*_HOME value that prevented the search from
247         # succeeding. This should be rare.
248         if any_found or not self._spec.xdg_home_is_customized(self._env):
249             return
250         default_home = self._spec.xdg_home_default_path(self._env)
251         default_path = Path(self._xdg_subdir / name)
252         if not (default_home / default_path).exists():
253             return
254         if self._spec.xdg_dirs_key is None:
255             suggest_key = self._spec.xdg_home_key
256             suggest_value = default_home
257         else:
258             suggest_key = self._spec.xdg_dirs_key
259             cur_value = self._env.get(suggest_key, '')
260             value_sep = ':' if cur_value else ''
261             suggest_value = f'{cur_value}{value_sep}{default_home}'
262         logger.warning(
263             "\
264 %s was not found under your configured $%s (%s), \
265 but does exist at the default location (%s) - \
266 consider running this program with the environment setting %s=%s\
267 ",
268             default_path,
269             self._spec.xdg_home_key,
270             self._spec.xdg_home(self._env, ''),
271             default_home,
272             suggest_key,
273             shlex.quote(suggest_value),
274         )
275
276     def storage_path(
277             self,
278             subdir: Union[str, os.PathLike]=PurePath(),
279             mode: int=0o700,
280     ) -> Path:
281         for path in self._spec.iter_systemd(self._env):
282             try:
283                 mode = path.stat().st_mode
284             except OSError:
285                 continue
286             if (mode & self._STORE_MODE) == self._STORE_MODE:
287                 break
288         else:
289             path = self._spec.xdg_home(self._env, self._xdg_subdir)
290         path /= subdir
291         path.mkdir(parents=True, exist_ok=True, mode=mode)
292         return path
293
294
295 def is_hex(s: str, *length_args: int) -> bool:
296     """Indicate whether a string is a hexadecimal number
297
298     This method returns true if all characters in the string are hexadecimal
299     digits. It is case-insensitive.
300
301     You can also pass optional length arguments to check that the string has
302     the expected number of digits. If you pass one integer, the string must
303     have that length exactly, otherwise the method returns False. If you
304     pass two integers, the string's length must fall within that minimum and
305     maximum (inclusive), otherwise the method returns False.
306
307     Arguments:
308
309     * s: str --- The string to check
310
311     * length_args: int --- Optional length limit(s) for the string to check
312     """
313     num_length_args = len(length_args)
314     if num_length_args > 2:
315         raise arvados.errors.ArgumentError(
316             "is_hex accepts up to 3 arguments ({} given)".format(1 + num_length_args))
317     elif num_length_args == 2:
318         good_len = (length_args[0] <= len(s) <= length_args[1])
319     elif num_length_args == 1:
320         good_len = (len(s) == length_args[0])
321     else:
322         good_len = True
323     return bool(good_len and HEX_RE.match(s))
324
325 def keyset_list_all(
326         fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
327         order_key: str="created_at",
328         num_retries: int=0,
329         ascending: bool=True,
330         key_fields: Container[str]=('uuid',),
331         **kwargs: Any,
332 ) -> Iterator[Dict[str, Any]]:
333     """Iterate all Arvados resources from an API list call
334
335     This method takes a method that represents an Arvados API list call, and
336     iterates the objects returned by the API server. It can make multiple API
337     calls to retrieve and iterate all objects available from the API server.
338
339     Arguments:
340
341     * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] --- A
342       function that wraps an Arvados API method that returns a list of
343       objects. If you have an Arvados API client named `arv`, examples
344       include `arv.collections().list` and `arv.groups().contents`. Note
345       that you should pass the function *without* calling it.
346
347     * order_key: str --- The name of the primary object field that objects
348       should be sorted by. This name is used to build an `order` argument
349       for `fn`. Default `'created_at'`.
350
351     * num_retries: int --- This argument is passed through to
352       `arvados.api_resources.ArvadosAPIRequest.execute` for each API call. See
353       that method's docstring for details. Default 0 (meaning API calls will
354       use the `num_retries` value set when the Arvados API client was
355       constructed).
356
357     * ascending: bool --- Used to build an `order` argument for `fn`. If True,
358       all fields will be sorted in `'asc'` (ascending) order. Otherwise, all
359       fields will be sorted in `'desc'` (descending) order.
360
361     * key_fields: Container[str] --- One or two fields that constitute
362       a unique key for returned items.  Normally this should be the
363       default value `('uuid',)`, unless `fn` returns
364       computed_permissions records, in which case it should be
365       `('user_uuid', 'target_uuid')`.  If two fields are given, one of
366       them must be equal to `order_key`.
367
368     Additional keyword arguments will be passed directly to `fn` for each API
369     call. Note that this function sets `count`, `limit`, and `order` as part of
370     its work.
371
372     """
373     tiebreak_keys = set(key_fields) - {order_key}
374     if len(tiebreak_keys) == 0:
375         tiebreak_key = 'uuid'
376     elif len(tiebreak_keys) == 1:
377         tiebreak_key = tiebreak_keys.pop()
378     else:
379         raise arvados.errors.ArgumentError(
380             "key_fields can have at most one entry that is not order_key")
381
382     pagesize = 1000
383     kwargs["limit"] = pagesize
384     kwargs["count"] = 'none'
385     asc = "asc" if ascending else "desc"
386     kwargs["order"] = [f"{order_key} {asc}", f"{tiebreak_key} {asc}"]
387     other_filters = kwargs.get("filters", [])
388
389     if 'select' in kwargs:
390         kwargs['select'] = list({*kwargs['select'], *key_fields, order_key})
391
392     nextpage = []
393     tot = 0
394     expect_full_page = True
395     key_getter = operator.itemgetter(*key_fields)
396     seen_prevpage = set()
397     seen_thispage = set()
398     lastitem = None
399     prev_page_all_same_order_key = False
400
401     while True:
402         kwargs["filters"] = nextpage+other_filters
403         items = fn(**kwargs).execute(num_retries=num_retries)
404
405         if len(items["items"]) == 0:
406             if prev_page_all_same_order_key:
407                 nextpage = [[order_key, ">" if ascending else "<", lastitem[order_key]]]
408                 prev_page_all_same_order_key = False
409                 continue
410             else:
411                 return
412
413         seen_prevpage = seen_thispage
414         seen_thispage = set()
415
416         for i in items["items"]:
417             # In cases where there's more than one record with the
418             # same order key, the result could include records we
419             # already saw in the last page.  Skip them.
420             seen_key = key_getter(i)
421             if seen_key in seen_prevpage:
422                 continue
423             seen_thispage.add(seen_key)
424             yield i
425
426         firstitem = items["items"][0]
427         lastitem = items["items"][-1]
428
429         if firstitem[order_key] == lastitem[order_key]:
430             # Got a page where every item has the same order key.
431             # Switch to using tiebreak key for paging.
432             nextpage = [[order_key, "=", lastitem[order_key]], [tiebreak_key, ">" if ascending else "<", lastitem[tiebreak_key]]]
433             prev_page_all_same_order_key = True
434         else:
435             # Start from the last order key seen, but skip the last
436             # known uuid to avoid retrieving the same row twice.  If
437             # there are multiple rows with the same order key it is
438             # still likely we'll end up retrieving duplicate rows.
439             # That's handled by tracking the "seen" rows for each page
440             # so they can be skipped if they show up on the next page.
441             nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]]]
442             if tiebreak_key == "uuid":
443                 nextpage += [[tiebreak_key, "!=", lastitem[tiebreak_key]]]
444             prev_page_all_same_order_key = False
445
446 def iter_computed_permissions(
447         fn: Callable[..., 'arvados.api_resources.ArvadosAPIRequest'],
448         order_key: str='user_uuid',
449         num_retries: int=0,
450         ascending: bool=True,
451         key_fields: Container[str]=('user_uuid', 'target_uuid'),
452         **kwargs: Any,
453 ) -> Iterator[Dict[str, Any]]:
454     """Iterate all `computed_permission` resources
455
456     This method is the same as `keyset_list_all`, except that its
457     default arguments are suitable for the computed_permissions API.
458
459     Arguments:
460
461     * fn: Callable[..., arvados.api_resources.ArvadosAPIRequest] ---
462       see `keyset_list_all`.  Typically this is an instance of
463       `arvados.api_resources.ComputedPermissions.list`.  Given an
464       Arvados API client named `arv`, typical usage is
465       `iter_computed_permissions(arv.computed_permissions().list)`.
466
467     * order_key: str --- see `keyset_list_all`.  Default
468       `'user_uuid'`.
469
470     * num_retries: int --- see `keyset_list_all`.
471
472     * ascending: bool --- see `keyset_list_all`.
473
474     * key_fields: Container[str] --- see `keyset_list_all`. Default
475       `('user_uuid', 'target_uuid')`.
476
477     """
478     return keyset_list_all(
479         fn=fn,
480         order_key=order_key,
481         num_retries=num_retries,
482         ascending=ascending,
483         key_fields=key_fields,
484         **kwargs)
485
486 def ca_certs_path(fallback: T=httplib2.CA_CERTS) -> Union[str, T]:
487     """Return the path of the best available source of CA certificates
488
489     This function checks various known paths that provide trusted CA
490     certificates, and returns the first one that exists. It checks:
491
492     * the path in the `SSL_CERT_FILE` environment variable (used by OpenSSL)
493     * `/etc/arvados/ca-certificates.crt`, respected by all Arvados software
494     * `/etc/ssl/certs/ca-certificates.crt`, the default store on Debian-based
495       distributions
496     * `/etc/pki/tls/certs/ca-bundle.crt`, the default store on Red Hat-based
497       distributions
498
499     If none of these paths exist, this function returns the value of `fallback`.
500
501     Arguments:
502
503     * fallback: T --- The value to return if none of the known paths exist.
504       The default value is the certificate store of Mozilla's trusted CAs
505       included with the Python [certifi][] package.
506
507     [certifi]: https://pypi.org/project/certifi/
508     """
509     for ca_certs_path in [
510         # SSL_CERT_FILE and SSL_CERT_DIR are openssl overrides - note
511         # that httplib2 itself also supports HTTPLIB2_CA_CERTS.
512         os.environ.get('SSL_CERT_FILE'),
513         # Arvados specific:
514         '/etc/arvados/ca-certificates.crt',
515         # Debian:
516         '/etc/ssl/certs/ca-certificates.crt',
517         # Red Hat:
518         '/etc/pki/tls/certs/ca-bundle.crt',
519         ]:
520         if ca_certs_path and os.path.exists(ca_certs_path):
521             return ca_certs_path
522     return fallback
523
524 def new_request_id() -> str:
525     """Return a random request ID
526
527     This function generates and returns a random string suitable for use as a
528     `X-Request-Id` header value in the Arvados API.
529     """
530     rid = "req-"
531     # 2**104 > 36**20 > 2**103
532     n = random.getrandbits(104)
533     for _ in range(20):
534         c = n % 36
535         if c < 10:
536             rid += chr(c+ord('0'))
537         else:
538             rid += chr(c+ord('a')-10)
539         n = n // 36
540     return rid
541
542 def get_config_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
543     """Return an Arvados cluster's configuration, with caching
544
545     This function gets and returns the Arvados configuration from the API
546     server. It caches the result on the client object and reuses it on any
547     future calls.
548
549     Arguments:
550
551     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
552       object to use to retrieve and cache the Arvados cluster configuration.
553     """
554     if not svc._rootDesc.get('resources').get('configs', False):
555         # Old API server version, no config export endpoint
556         return {}
557     if not hasattr(svc, '_cached_config'):
558         svc._cached_config = svc.configs().get().execute()
559     return svc._cached_config
560
561 def get_vocabulary_once(svc: 'arvados.api_resources.ArvadosAPIClient') -> Dict[str, Any]:
562     """Return an Arvados cluster's vocabulary, with caching
563
564     This function gets and returns the Arvados vocabulary from the API
565     server. It caches the result on the client object and reuses it on any
566     future calls.
567
568     .. HINT:: Low-level method
569        This is a relatively low-level wrapper around the Arvados API. Most
570        users will prefer to use `arvados.vocabulary.load_vocabulary`.
571
572     Arguments:
573
574     * svc: arvados.api_resources.ArvadosAPIClient --- The Arvados API client
575       object to use to retrieve and cache the Arvados cluster vocabulary.
576     """
577     if not svc._rootDesc.get('resources').get('vocabularies', False):
578         # Old API server version, no vocabulary export endpoint
579         return {}
580     if not hasattr(svc, '_cached_vocabulary'):
581         svc._cached_vocabulary = svc.vocabularies().get().execute()
582     return svc._cached_vocabulary
583
584 def trim_name(collectionname: str) -> str:
585     """Limit the length of a name to fit within Arvados API limits
586
587     This function ensures that a string is short enough to use as an object
588     name in the Arvados API, leaving room for text that may be added by the
589     `ensure_unique_name` argument. If the source name is short enough, it is
590     returned unchanged. Otherwise, this function returns a string with excess
591     characters removed from the middle of the source string and replaced with
592     an ellipsis.
593
594     Arguments:
595
596     * collectionname: str --- The desired source name
597     """
598     max_name_len = 254 - 28
599
600     if len(collectionname) > max_name_len:
601         over = len(collectionname) - max_name_len
602         split = int(max_name_len/2)
603         collectionname = collectionname[0:split] + "…" + collectionname[split+over:]
604
605     return collectionname