21935: Rename SafeHTTPCache to ThreadSafeHTTPCache
[arvados.git] / sdk / python / arvados / api.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados API client
5
6 The code in this module builds Arvados API client objects you can use to submit
7 Arvados API requests. This includes extending the underlying HTTP client with
8 niceties such as caching, X-Request-Id header for tracking, and more. The main
9 client constructors are `api` and `api_from_config`.
10 """
11
12 import collections
13 import errno
14 import hashlib
15 import httplib2
16 import json
17 import logging
18 import os
19 import pathlib
20 import re
21 import socket
22 import ssl
23 import sys
24 import tempfile
25 import threading
26 import time
27 import types
28
29 from typing import (
30     Any,
31     Dict,
32     List,
33     Mapping,
34     Optional,
35 )
36
37 import apiclient
38 import apiclient.http
39 from apiclient import discovery as apiclient_discovery
40 from apiclient import errors as apiclient_errors
41 from . import config
42 from . import errors
43 from . import keep
44 from . import retry
45 from . import util
46 from ._internal import basedirs
47 from .logging import GoogleHTTPClientFilter, log_handler
48
49 _logger = logging.getLogger('arvados.api')
50 _googleapiclient_log_lock = threading.Lock()
51
52 MAX_IDLE_CONNECTION_DURATION = 30
53 """
54 Number of seconds that API client HTTP connections should be allowed to idle
55 in keepalive state before they are forced closed. Client code can adjust this
56 constant, and it will be used for all Arvados API clients constructed after
57 that point.
58 """
59
60 # An unused HTTP 5xx status code to request a retry internally.
61 # See _intercept_http_request. This should not be user-visible.
62 _RETRY_4XX_STATUS = 545
63
64 if sys.version_info >= (3,):
65     httplib2.SSLHandshakeError = None
66
67 _orig_retry_request = apiclient.http._retry_request
68 def _retry_request(http, num_retries, *args, **kwargs):
69     try:
70         num_retries = max(num_retries, http.num_retries)
71     except AttributeError:
72         # `http` client object does not have a `num_retries` attribute.
73         # It apparently hasn't gone through _patch_http_request, possibly
74         # because this isn't an Arvados API client. Pass through to
75         # avoid interfering with other Google API clients.
76         return _orig_retry_request(http, num_retries, *args, **kwargs)
77     response, body = _orig_retry_request(http, num_retries, *args, **kwargs)
78     # If _intercept_http_request ran out of retries for a 4xx response,
79     # restore the original status code.
80     if response.status == _RETRY_4XX_STATUS:
81         response.status = int(response['status'])
82     return (response, body)
83 apiclient.http._retry_request = _retry_request
84
85 def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
86     if not headers.get('X-Request-Id'):
87         headers['X-Request-Id'] = self._request_id()
88     try:
89         if (self.max_request_size and
90             kwargs.get('body') and
91             self.max_request_size < len(kwargs['body'])):
92             raise apiclient_errors.MediaUploadSizeError("Request size %i bytes exceeds published limit of %i bytes" % (len(kwargs['body']), self.max_request_size))
93
94         headers['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
95
96         if (time.time() - self._last_request_time) > self._max_keepalive_idle:
97             # High probability of failure due to connection atrophy. Make
98             # sure this request [re]opens a new connection by closing and
99             # forgetting all cached connections first.
100             for conn in self.connections.values():
101                 conn.close()
102             self.connections.clear()
103
104         self._last_request_time = time.time()
105         try:
106             response, body = self.orig_http_request(uri, method, headers=headers, **kwargs)
107         except ssl.CertificateError as e:
108             raise ssl.CertificateError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None
109         # googleapiclient only retries 403, 429, and 5xx status codes.
110         # If we got another 4xx status that we want to retry, convert it into
111         # 5xx so googleapiclient handles it the way we want.
112         if response.status in retry._HTTP_CAN_RETRY and response.status < 500:
113             response.status = _RETRY_4XX_STATUS
114         return (response, body)
115     except Exception as e:
116         # Prepend "[request_id] " to the error message, which we
117         # assume is the first string argument passed to the exception
118         # constructor.
119         for i in range(len(e.args or ())):
120             if type(e.args[i]) == type(""):
121                 e.args = e.args[:i] + ("[{}] {}".format(headers['X-Request-Id'], e.args[i]),) + e.args[i+1:]
122                 raise type(e)(*e.args)
123         raise
124
125 def _patch_http_request(http, api_token, num_retries):
126     http.arvados_api_token = api_token
127     http.max_request_size = 0
128     http.num_retries = num_retries
129     http.orig_http_request = http.request
130     http.request = types.MethodType(_intercept_http_request, http)
131     http._last_request_time = 0
132     http._max_keepalive_idle = MAX_IDLE_CONNECTION_DURATION
133     http._request_id = util.new_request_id
134     return http
135
136 def _close_connections(self):
137     for conn in self._http.connections.values():
138         conn.close()
139
140 # Monkey patch discovery._cast() so objects and arrays get serialized
141 # with json.dumps() instead of str().
142 _cast_orig = apiclient_discovery._cast
143 def _cast_objects_too(value, schema_type):
144     global _cast_orig
145     if (type(value) != type('') and
146         type(value) != type(b'') and
147         (schema_type == 'object' or schema_type == 'array')):
148         return json.dumps(value)
149     else:
150         return _cast_orig(value, schema_type)
151 apiclient_discovery._cast = _cast_objects_too
152
153 # Convert apiclient's HttpErrors into our own API error subclass for better
154 # error reporting.
155 # Reassigning apiclient_errors.HttpError is not sufficient because most of the
156 # apiclient submodules import the class into their own namespace.
157 def _new_http_error(cls, *args, **kwargs):
158     return super(apiclient_errors.HttpError, cls).__new__(
159         errors.ApiError, *args, **kwargs)
160 apiclient_errors.HttpError.__new__ = staticmethod(_new_http_error)
161
162 class ThreadSafeHTTPCache:
163     """Thread-safe replacement for `httplib2.FileCache`
164
165     `arvados.api.http_cache` is the preferred way to construct this object.
166     Refer to that function's docstring for details.
167     """
168
169     def __init__(self, path=None, max_age=None):
170         self._dir = path
171         if max_age is not None:
172             try:
173                 self._clean(threshold=time.time() - max_age)
174             except:
175                 pass
176
177     def _clean(self, threshold=0):
178         for ent in os.listdir(self._dir):
179             fnm = os.path.join(self._dir, ent)
180             if os.path.isdir(fnm) or not fnm.endswith('.tmp'):
181                 continue
182             stat = os.lstat(fnm)
183             if stat.st_mtime < threshold:
184                 try:
185                     os.unlink(fnm)
186                 except OSError as err:
187                     if err.errno != errno.ENOENT:
188                         raise
189
190     def __str__(self):
191         return self._dir
192
193     def _filename(self, url):
194         return os.path.join(self._dir, hashlib.md5(url.encode('utf-8')).hexdigest()+'.tmp')
195
196     def get(self, url):
197         filename = self._filename(url)
198         try:
199             with open(filename, 'rb') as f:
200                 return f.read()
201         except (IOError, OSError):
202             return None
203
204     def set(self, url, content):
205         try:
206             fd, tempname = tempfile.mkstemp(dir=self._dir)
207         except:
208             return None
209         try:
210             try:
211                 f = os.fdopen(fd, 'wb')
212             except:
213                 os.close(fd)
214                 raise
215             try:
216                 f.write(content)
217             finally:
218                 f.close()
219             os.rename(tempname, self._filename(url))
220             tempname = None
221         finally:
222             if tempname:
223                 os.unlink(tempname)
224
225     def delete(self, url):
226         try:
227             os.unlink(self._filename(url))
228         except OSError as err:
229             if err.errno != errno.ENOENT:
230                 raise
231
232
233 class ThreadSafeAPIClient(object):
234     """Thread-safe wrapper for an Arvados API client
235
236     This class takes all the arguments necessary to build a lower-level
237     Arvados API client `googleapiclient.discovery.Resource`, then
238     transparently builds and wraps a unique object per thread. This works
239     around the fact that the client's underlying HTTP client object is not
240     thread-safe.
241
242     Arguments:
243
244     * apiconfig: Mapping[str, str] | None --- A mapping with entries for
245       `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
246       `ARVADOS_API_HOST_INSECURE`. If not provided, uses
247       `arvados.config.settings` to get these parameters from user
248       configuration.  You can pass an empty mapping to build the client
249       solely from `api_params`.
250
251     * keep_params: Mapping[str, Any] --- Keyword arguments used to construct
252       an associated `arvados.keep.KeepClient`.
253
254     * api_params: Mapping[str, Any] --- Keyword arguments used to construct
255       each thread's API client. These have the same meaning as in the
256       `arvados.api.api` function.
257
258     * version: str | None --- A string naming the version of the Arvados API
259       to use. If not specified, the code will log a warning and fall back to
260       `'v1'`.
261     """
262     def __init__(
263             self,
264             apiconfig: Optional[Mapping[str, str]]=None,
265             keep_params: Optional[Mapping[str, Any]]={},
266             api_params: Optional[Mapping[str, Any]]={},
267             version: Optional[str]=None,
268     ) -> None:
269         if apiconfig or apiconfig is None:
270             self._api_kwargs = api_kwargs_from_config(version, apiconfig, **api_params)
271         else:
272             self._api_kwargs = normalize_api_kwargs(version, **api_params)
273         self.api_token = self._api_kwargs['token']
274         self.request_id = self._api_kwargs.get('request_id')
275         self.local = threading.local()
276         self.keep = keep.KeepClient(api_client=self, **keep_params)
277
278     def localapi(self) -> 'googleapiclient.discovery.Resource':
279         try:
280             client = self.local.api
281         except AttributeError:
282             client = api_client(**self._api_kwargs)
283             client._http._request_id = lambda: self.request_id or util.new_request_id()
284             self.local.api = client
285         return client
286
287     def __getattr__(self, name: str) -> Any:
288         # Proxy nonexistent attributes to the thread-local API client.
289         return getattr(self.localapi(), name)
290
291
292 def http_cache(data_type: str) -> Optional[ThreadSafeHTTPCache]:
293     """Set up an HTTP file cache
294
295     This function constructs and returns an `arvados.api.ThreadSafeHTTPCache`
296     backed by the filesystem under a cache directory from the environment, or
297     `None` if the directory cannot be set up. The return value can be passed to
298     `httplib2.Http` as the `cache` argument.
299
300     Arguments:
301
302     * data_type: str --- The name of the subdirectory
303       where data is cached.
304     """
305     try:
306         path = basedirs.BaseDirectories('CACHE').storage_path(data_type)
307     except (OSError, RuntimeError):
308         return None
309     else:
310         return ThreadSafeHTTPCache(str(path), max_age=60*60*24*2)
311
312 def api_client(
313         version: str,
314         discoveryServiceUrl: str,
315         token: str,
316         *,
317         cache: bool=True,
318         http: Optional[httplib2.Http]=None,
319         insecure: bool=False,
320         num_retries: int=10,
321         request_id: Optional[str]=None,
322         timeout: int=5*60,
323         **kwargs: Any,
324 ) -> apiclient_discovery.Resource:
325     """Build an Arvados API client
326
327     This function returns a `googleapiclient.discovery.Resource` object
328     constructed from the given arguments. This is a relatively low-level
329     interface that requires all the necessary inputs as arguments. Most
330     users will prefer to use `api` which can accept more flexible inputs.
331
332     Arguments:
333
334     * version: str --- A string naming the version of the Arvados API to use.
335
336     * discoveryServiceUrl: str --- The URL used to discover APIs passed
337       directly to `googleapiclient.discovery.build`.
338
339     * token: str --- The authentication token to send with each API call.
340
341     Keyword-only arguments:
342
343     * cache: bool --- If true, loads the API discovery document from, or
344       saves it to, a cache on disk.
345
346     * http: httplib2.Http | None --- The HTTP client object the API client
347       object will use to make requests.  If not provided, this function will
348       build its own to use. Either way, the object will be patched as part
349       of the build process.
350
351     * insecure: bool --- If true, ignore SSL certificate validation
352       errors. Default `False`.
353
354     * num_retries: int --- The number of times to retry each API request if
355       it encounters a temporary failure. Default 10.
356
357     * request_id: str | None --- Default `X-Request-Id` header value for
358       outgoing requests that don't already provide one. If `None` or
359       omitted, generate a random ID. When retrying failed requests, the same
360       ID is used on all attempts.
361
362     * timeout: int --- A timeout value for HTTP requests in seconds. Default
363       300 (5 minutes).
364
365     Additional keyword arguments will be passed directly to
366     `googleapiclient.discovery.build`.
367     """
368     if http is None:
369         http = httplib2.Http(
370             ca_certs=util.ca_certs_path(),
371             cache=http_cache('discovery') if cache else None,
372             disable_ssl_certificate_validation=bool(insecure),
373         )
374     if http.timeout is None:
375         http.timeout = timeout
376     http = _patch_http_request(http, token, num_retries)
377
378     # The first time a client is instantiated, temporarily route
379     # googleapiclient.http retry logs if they're not already. These are
380     # important because temporary problems fetching the discovery document
381     # can cause clients to appear to hang early. This can be removed after
382     # we have a more general story for handling googleapiclient logs (#20521).
383     client_logger = logging.getLogger('googleapiclient.http')
384     # "first time a client is instantiated" = thread that acquires this lock
385     # It is never released.
386     # googleapiclient sets up its own NullHandler so we detect if logging is
387     # configured by looking for a real handler anywhere in the hierarchy.
388     client_logger_unconfigured = _googleapiclient_log_lock.acquire(blocking=False) and all(
389         isinstance(handler, logging.NullHandler)
390         for logger_name in ['', 'googleapiclient', 'googleapiclient.http']
391         for handler in logging.getLogger(logger_name).handlers
392     )
393     if client_logger_unconfigured:
394         client_level = client_logger.level
395         client_filter = GoogleHTTPClientFilter()
396         client_logger.addFilter(client_filter)
397         client_logger.addHandler(log_handler)
398         if logging.NOTSET < client_level < client_filter.retry_levelno:
399             client_logger.setLevel(client_level)
400         else:
401             client_logger.setLevel(client_filter.retry_levelno)
402     try:
403         svc = apiclient_discovery.build(
404             'arvados', version,
405             cache_discovery=False,
406             discoveryServiceUrl=discoveryServiceUrl,
407             http=http,
408             num_retries=num_retries,
409             **kwargs,
410         )
411     finally:
412         if client_logger_unconfigured:
413             client_logger.removeHandler(log_handler)
414             client_logger.removeFilter(client_filter)
415             client_logger.setLevel(client_level)
416     svc.api_token = token
417     svc.insecure = insecure
418     svc.request_id = request_id
419     svc.config = lambda: util.get_config_once(svc)
420     svc.vocabulary = lambda: util.get_vocabulary_once(svc)
421     svc.close_connections = types.MethodType(_close_connections, svc)
422     http.max_request_size = svc._rootDesc.get('maxRequestSize', 0)
423     http.cache = None
424     http._request_id = lambda: svc.request_id or util.new_request_id()
425     return svc
426
427 def normalize_api_kwargs(
428         version: Optional[str]=None,
429         discoveryServiceUrl: Optional[str]=None,
430         host: Optional[str]=None,
431         token: Optional[str]=None,
432         **kwargs: Any,
433 ) -> Dict[str, Any]:
434     """Validate kwargs from `api` and build kwargs for `api_client`
435
436     This method takes high-level keyword arguments passed to the `api`
437     constructor and normalizes them into a new dictionary that can be passed
438     as keyword arguments to `api_client`. It raises `ValueError` if required
439     arguments are missing or conflict.
440
441     Arguments:
442
443     * version: str | None --- A string naming the version of the Arvados API
444       to use. If not specified, the code will log a warning and fall back to
445       'v1'.
446
447     * discoveryServiceUrl: str | None --- The URL used to discover APIs
448       passed directly to `googleapiclient.discovery.build`. It is an error
449       to pass both `discoveryServiceUrl` and `host`.
450
451     * host: str | None --- The hostname and optional port number of the
452       Arvados API server. Used to build `discoveryServiceUrl`. It is an
453       error to pass both `discoveryServiceUrl` and `host`.
454
455     * token: str --- The authentication token to send with each API call.
456
457     Additional keyword arguments will be included in the return value.
458     """
459     if discoveryServiceUrl and host:
460         raise ValueError("both discoveryServiceUrl and host provided")
461     elif discoveryServiceUrl:
462         url_src = "discoveryServiceUrl"
463     elif host:
464         url_src = "host argument"
465         discoveryServiceUrl = 'https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' % (host,)
466     elif token:
467         # This specific error message gets priority for backwards compatibility.
468         raise ValueError("token argument provided, but host missing.")
469     else:
470         raise ValueError("neither discoveryServiceUrl nor host provided")
471     if not token:
472         raise ValueError("%s provided, but token missing" % (url_src,))
473     if not version:
474         version = 'v1'
475         _logger.info(
476             "Using default API version. Call arvados.api(%r) instead.",
477             version,
478         )
479     return {
480         'discoveryServiceUrl': discoveryServiceUrl,
481         'token': token,
482         'version': version,
483         **kwargs,
484     }
485
486 def api_kwargs_from_config(
487         version: Optional[str]=None,
488         apiconfig: Optional[Mapping[str, str]]=None,
489         **kwargs: Any
490 ) -> Dict[str, Any]:
491     """Build `api_client` keyword arguments from configuration
492
493     This function accepts a mapping with Arvados configuration settings like
494     `ARVADOS_API_HOST` and converts them into a mapping of keyword arguments
495     that can be passed to `api_client`. If `ARVADOS_API_HOST` or
496     `ARVADOS_API_TOKEN` are not configured, it raises `ValueError`.
497
498     Arguments:
499
500     * version: str | None --- A string naming the version of the Arvados API
501       to use. If not specified, the code will log a warning and fall back to
502       'v1'.
503
504     * apiconfig: Mapping[str, str] | None --- A mapping with entries for
505       `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
506       `ARVADOS_API_HOST_INSECURE`. If not provided, calls
507       `arvados.config.settings` to get these parameters from user
508       configuration.
509
510     Additional keyword arguments will be included in the return value.
511     """
512     if apiconfig is None:
513         apiconfig = config.settings()
514     missing = " and ".join(
515         key
516         for key in ['ARVADOS_API_HOST', 'ARVADOS_API_TOKEN']
517         if key not in apiconfig
518     )
519     if missing:
520         raise ValueError(
521             "%s not set.\nPlease set in %s or export environment variable." %
522             (missing, config.default_config_file),
523         )
524     return normalize_api_kwargs(
525         version,
526         None,
527         apiconfig['ARVADOS_API_HOST'],
528         apiconfig['ARVADOS_API_TOKEN'],
529         insecure=config.flag_is_true('ARVADOS_API_HOST_INSECURE', apiconfig),
530         **kwargs,
531     )
532
533 def api(
534         version: Optional[str]=None,
535         cache: bool=True,
536         host: Optional[str]=None,
537         token: Optional[str]=None,
538         insecure: bool=False,
539         request_id: Optional[str]=None,
540         timeout: int=5*60,
541         *,
542         discoveryServiceUrl: Optional[str]=None,
543         **kwargs: Any,
544 ) -> ThreadSafeAPIClient:
545     """Dynamically build an Arvados API client
546
547     This function provides a high-level "do what I mean" interface to build an
548     Arvados API client object. You can call it with no arguments to build a
549     client from user configuration; pass `host` and `token` arguments just
550     like you would write in user configuration; or pass additional arguments
551     for lower-level control over the client.
552
553     This function returns a `arvados.api.ThreadSafeAPIClient`, an
554     API-compatible wrapper around `googleapiclient.discovery.Resource`. If
555     you're handling concurrency yourself and/or your application is very
556     performance-sensitive, consider calling `api_client` directly.
557
558     Arguments:
559
560     * version: str | None --- A string naming the version of the Arvados API
561       to use. If not specified, the code will log a warning and fall back to
562       'v1'.
563
564     * host: str | None --- The hostname and optional port number of the
565       Arvados API server.
566
567     * token: str | None --- The authentication token to send with each API
568       call.
569
570     * discoveryServiceUrl: str | None --- The URL used to discover APIs
571       passed directly to `googleapiclient.discovery.build`.
572
573     If `host`, `token`, and `discoveryServiceUrl` are all omitted, `host` and
574     `token` will be loaded from the user's configuration. Otherwise, you must
575     pass `token` and one of `host` or `discoveryServiceUrl`. It is an error to
576     pass both `host` and `discoveryServiceUrl`.
577
578     Other arguments are passed directly to `api_client`. See that function's
579     docstring for more information about their meaning.
580     """
581     kwargs.update(
582         cache=cache,
583         insecure=insecure,
584         request_id=request_id,
585         timeout=timeout,
586     )
587     if discoveryServiceUrl or host or token:
588         kwargs.update(normalize_api_kwargs(version, discoveryServiceUrl, host, token))
589     else:
590         kwargs.update(api_kwargs_from_config(version))
591     version = kwargs.pop('version')
592     return ThreadSafeAPIClient({}, {}, kwargs, version)
593
594 def api_from_config(
595         version: Optional[str]=None,
596         apiconfig: Optional[Mapping[str, str]]=None,
597         **kwargs: Any
598 ) -> ThreadSafeAPIClient:
599     """Build an Arvados API client from a configuration mapping
600
601     This function builds an Arvados API client from a mapping with user
602     configuration. It accepts that mapping as an argument, so you can use a
603     configuration that's different from what the user has set up.
604
605     This function returns a `arvados.api.ThreadSafeAPIClient`, an
606     API-compatible wrapper around `googleapiclient.discovery.Resource`. If
607     you're handling concurrency yourself and/or your application is very
608     performance-sensitive, consider calling `api_client` directly.
609
610     Arguments:
611
612     * version: str | None --- A string naming the version of the Arvados API
613       to use. If not specified, the code will log a warning and fall back to
614       'v1'.
615
616     * apiconfig: Mapping[str, str] | None --- A mapping with entries for
617       `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
618       `ARVADOS_API_HOST_INSECURE`. If not provided, calls
619       `arvados.config.settings` to get these parameters from user
620       configuration.
621
622     Other arguments are passed directly to `api_client`. See that function's
623     docstring for more information about their meaning.
624     """
625     return api(**api_kwargs_from_config(version, apiconfig, **kwargs))