1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 The code in this module builds Arvados API client objects you can use to submit
7 Arvados API requests. This includes extending the underlying HTTP client with
8 niceties such as caching, X-Request-Id header for tracking, and more. The main
9 client constructors are `api` and `api_from_config`.
39 from apiclient import discovery as apiclient_discovery
40 from apiclient import errors as apiclient_errors
46 from ._internal import basedirs
47 from .logging import GoogleHTTPClientFilter, log_handler
49 _logger = logging.getLogger('arvados.api')
50 _googleapiclient_log_lock = threading.Lock()
52 MAX_IDLE_CONNECTION_DURATION = 30
54 Number of seconds that API client HTTP connections should be allowed to idle
55 in keepalive state before they are forced closed. Client code can adjust this
56 constant, and it will be used for all Arvados API clients constructed after
60 # An unused HTTP 5xx status code to request a retry internally.
61 # See _intercept_http_request. This should not be user-visible.
62 _RETRY_4XX_STATUS = 545
64 if sys.version_info >= (3,):
65 httplib2.SSLHandshakeError = None
67 _orig_retry_request = apiclient.http._retry_request
68 def _retry_request(http, num_retries, *args, **kwargs):
70 num_retries = max(num_retries, http.num_retries)
71 except AttributeError:
72 # `http` client object does not have a `num_retries` attribute.
73 # It apparently hasn't gone through _patch_http_request, possibly
74 # because this isn't an Arvados API client. Pass through to
75 # avoid interfering with other Google API clients.
76 return _orig_retry_request(http, num_retries, *args, **kwargs)
77 response, body = _orig_retry_request(http, num_retries, *args, **kwargs)
78 # If _intercept_http_request ran out of retries for a 4xx response,
79 # restore the original status code.
80 if response.status == _RETRY_4XX_STATUS:
81 response.status = int(response['status'])
82 return (response, body)
83 apiclient.http._retry_request = _retry_request
85 def _intercept_http_request(self, uri, method="GET", headers={}, **kwargs):
86 if not headers.get('X-Request-Id'):
87 headers['X-Request-Id'] = self._request_id()
89 if (self.max_request_size and
90 kwargs.get('body') and
91 self.max_request_size < len(kwargs['body'])):
92 raise apiclient_errors.MediaUploadSizeError("Request size %i bytes exceeds published limit of %i bytes" % (len(kwargs['body']), self.max_request_size))
94 headers['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
96 if (time.time() - self._last_request_time) > self._max_keepalive_idle:
97 # High probability of failure due to connection atrophy. Make
98 # sure this request [re]opens a new connection by closing and
99 # forgetting all cached connections first.
100 for conn in self.connections.values():
102 self.connections.clear()
104 self._last_request_time = time.time()
106 response, body = self.orig_http_request(uri, method, headers=headers, **kwargs)
107 except ssl.CertificateError as e:
108 raise ssl.CertificateError(e.args[0], "Could not connect to %s\n%s\nPossible causes: remote SSL/TLS certificate expired, or was issued by an untrusted certificate authority." % (uri, e)) from None
109 # googleapiclient only retries 403, 429, and 5xx status codes.
110 # If we got another 4xx status that we want to retry, convert it into
111 # 5xx so googleapiclient handles it the way we want.
112 if response.status in retry._HTTP_CAN_RETRY and response.status < 500:
113 response.status = _RETRY_4XX_STATUS
114 return (response, body)
115 except Exception as e:
116 # Prepend "[request_id] " to the error message, which we
117 # assume is the first string argument passed to the exception
119 for i in range(len(e.args or ())):
120 if type(e.args[i]) == type(""):
121 e.args = e.args[:i] + ("[{}] {}".format(headers['X-Request-Id'], e.args[i]),) + e.args[i+1:]
122 raise type(e)(*e.args)
125 def _patch_http_request(http, api_token, num_retries):
126 http.arvados_api_token = api_token
127 http.max_request_size = 0
128 http.num_retries = num_retries
129 http.orig_http_request = http.request
130 http.request = types.MethodType(_intercept_http_request, http)
131 http._last_request_time = 0
132 http._max_keepalive_idle = MAX_IDLE_CONNECTION_DURATION
133 http._request_id = util.new_request_id
136 def _close_connections(self):
137 for conn in self._http.connections.values():
140 # Monkey patch discovery._cast() so objects and arrays get serialized
141 # with json.dumps() instead of str().
142 _cast_orig = apiclient_discovery._cast
143 def _cast_objects_too(value, schema_type):
145 if (type(value) != type('') and
146 type(value) != type(b'') and
147 (schema_type == 'object' or schema_type == 'array')):
148 return json.dumps(value)
150 return _cast_orig(value, schema_type)
151 apiclient_discovery._cast = _cast_objects_too
153 # Convert apiclient's HttpErrors into our own API error subclass for better
155 # Reassigning apiclient_errors.HttpError is not sufficient because most of the
156 # apiclient submodules import the class into their own namespace.
157 def _new_http_error(cls, *args, **kwargs):
158 return super(apiclient_errors.HttpError, cls).__new__(
159 errors.ApiError, *args, **kwargs)
160 apiclient_errors.HttpError.__new__ = staticmethod(_new_http_error)
162 class ThreadSafeHTTPCache:
163 """Thread-safe replacement for `httplib2.FileCache`
165 `arvados.api.http_cache` is the preferred way to construct this object.
166 Refer to that function's docstring for details.
169 def __init__(self, path=None, max_age=None):
171 if max_age is not None:
173 self._clean(threshold=time.time() - max_age)
177 def _clean(self, threshold=0):
178 for ent in os.listdir(self._dir):
179 fnm = os.path.join(self._dir, ent)
180 if os.path.isdir(fnm) or not fnm.endswith('.tmp'):
183 if stat.st_mtime < threshold:
186 except OSError as err:
187 if err.errno != errno.ENOENT:
193 def _filename(self, url):
194 return os.path.join(self._dir, hashlib.md5(url.encode('utf-8')).hexdigest()+'.tmp')
197 filename = self._filename(url)
199 with open(filename, 'rb') as f:
201 except (IOError, OSError):
204 def set(self, url, content):
206 fd, tempname = tempfile.mkstemp(dir=self._dir)
211 f = os.fdopen(fd, 'wb')
219 os.rename(tempname, self._filename(url))
225 def delete(self, url):
227 os.unlink(self._filename(url))
228 except OSError as err:
229 if err.errno != errno.ENOENT:
233 class ThreadSafeAPIClient(object):
234 """Thread-safe wrapper for an Arvados API client
236 This class takes all the arguments necessary to build a lower-level
237 Arvados API client `googleapiclient.discovery.Resource`, then
238 transparently builds and wraps a unique object per thread. This works
239 around the fact that the client's underlying HTTP client object is not
244 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
245 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
246 `ARVADOS_API_HOST_INSECURE`. If not provided, uses
247 `arvados.config.settings` to get these parameters from user
248 configuration. You can pass an empty mapping to build the client
249 solely from `api_params`.
251 * keep_params: Mapping[str, Any] --- Keyword arguments used to construct
252 an associated `arvados.keep.KeepClient`.
254 * api_params: Mapping[str, Any] --- Keyword arguments used to construct
255 each thread's API client. These have the same meaning as in the
256 `arvados.api.api` function.
258 * version: str | None --- A string naming the version of the Arvados API
259 to use. If not specified, the code will log a warning and fall back to
264 apiconfig: Optional[Mapping[str, str]]=None,
265 keep_params: Optional[Mapping[str, Any]]={},
266 api_params: Optional[Mapping[str, Any]]={},
267 version: Optional[str]=None,
269 if apiconfig or apiconfig is None:
270 self._api_kwargs = api_kwargs_from_config(version, apiconfig, **api_params)
272 self._api_kwargs = normalize_api_kwargs(version, **api_params)
273 self.api_token = self._api_kwargs['token']
274 self.request_id = self._api_kwargs.get('request_id')
275 self.local = threading.local()
276 self.keep = keep.KeepClient(api_client=self, **keep_params)
278 def localapi(self) -> 'googleapiclient.discovery.Resource':
280 client = self.local.api
281 except AttributeError:
282 client = api_client(**self._api_kwargs)
283 client._http._request_id = lambda: self.request_id or util.new_request_id()
284 self.local.api = client
287 def __getattr__(self, name: str) -> Any:
288 # Proxy nonexistent attributes to the thread-local API client.
289 return getattr(self.localapi(), name)
292 def http_cache(data_type: str) -> Optional[ThreadSafeHTTPCache]:
293 """Set up an HTTP file cache
295 This function constructs and returns an `arvados.api.ThreadSafeHTTPCache`
296 backed by the filesystem under a cache directory from the environment, or
297 `None` if the directory cannot be set up. The return value can be passed to
298 `httplib2.Http` as the `cache` argument.
302 * data_type: str --- The name of the subdirectory
303 where data is cached.
306 path = basedirs.BaseDirectories('CACHE').storage_path(data_type)
307 except (OSError, RuntimeError):
310 return ThreadSafeHTTPCache(str(path), max_age=60*60*24*2)
314 discoveryServiceUrl: str,
318 http: Optional[httplib2.Http]=None,
319 insecure: bool=False,
321 request_id: Optional[str]=None,
324 ) -> apiclient_discovery.Resource:
325 """Build an Arvados API client
327 This function returns a `googleapiclient.discovery.Resource` object
328 constructed from the given arguments. This is a relatively low-level
329 interface that requires all the necessary inputs as arguments. Most
330 users will prefer to use `api` which can accept more flexible inputs.
334 * version: str --- A string naming the version of the Arvados API to use.
336 * discoveryServiceUrl: str --- The URL used to discover APIs passed
337 directly to `googleapiclient.discovery.build`.
339 * token: str --- The authentication token to send with each API call.
341 Keyword-only arguments:
343 * cache: bool --- If true, loads the API discovery document from, or
344 saves it to, a cache on disk.
346 * http: httplib2.Http | None --- The HTTP client object the API client
347 object will use to make requests. If not provided, this function will
348 build its own to use. Either way, the object will be patched as part
349 of the build process.
351 * insecure: bool --- If true, ignore SSL certificate validation
352 errors. Default `False`.
354 * num_retries: int --- The number of times to retry each API request if
355 it encounters a temporary failure. Default 10.
357 * request_id: str | None --- Default `X-Request-Id` header value for
358 outgoing requests that don't already provide one. If `None` or
359 omitted, generate a random ID. When retrying failed requests, the same
360 ID is used on all attempts.
362 * timeout: int --- A timeout value for HTTP requests in seconds. Default
365 Additional keyword arguments will be passed directly to
366 `googleapiclient.discovery.build`.
369 http = httplib2.Http(
370 ca_certs=util.ca_certs_path(),
371 cache=http_cache('discovery') if cache else None,
372 disable_ssl_certificate_validation=bool(insecure),
374 if http.timeout is None:
375 http.timeout = timeout
376 http = _patch_http_request(http, token, num_retries)
378 # The first time a client is instantiated, temporarily route
379 # googleapiclient.http retry logs if they're not already. These are
380 # important because temporary problems fetching the discovery document
381 # can cause clients to appear to hang early. This can be removed after
382 # we have a more general story for handling googleapiclient logs (#20521).
383 client_logger = logging.getLogger('googleapiclient.http')
384 # "first time a client is instantiated" = thread that acquires this lock
385 # It is never released.
386 # googleapiclient sets up its own NullHandler so we detect if logging is
387 # configured by looking for a real handler anywhere in the hierarchy.
388 client_logger_unconfigured = _googleapiclient_log_lock.acquire(blocking=False) and all(
389 isinstance(handler, logging.NullHandler)
390 for logger_name in ['', 'googleapiclient', 'googleapiclient.http']
391 for handler in logging.getLogger(logger_name).handlers
393 if client_logger_unconfigured:
394 client_level = client_logger.level
395 client_filter = GoogleHTTPClientFilter()
396 client_logger.addFilter(client_filter)
397 client_logger.addHandler(log_handler)
398 if logging.NOTSET < client_level < client_filter.retry_levelno:
399 client_logger.setLevel(client_level)
401 client_logger.setLevel(client_filter.retry_levelno)
403 svc = apiclient_discovery.build(
405 cache_discovery=False,
406 discoveryServiceUrl=discoveryServiceUrl,
408 num_retries=num_retries,
412 if client_logger_unconfigured:
413 client_logger.removeHandler(log_handler)
414 client_logger.removeFilter(client_filter)
415 client_logger.setLevel(client_level)
416 svc.api_token = token
417 svc.insecure = insecure
418 svc.request_id = request_id
419 svc.config = lambda: util.get_config_once(svc)
420 svc.vocabulary = lambda: util.get_vocabulary_once(svc)
421 svc.close_connections = types.MethodType(_close_connections, svc)
422 http.max_request_size = svc._rootDesc.get('maxRequestSize', 0)
424 http._request_id = lambda: svc.request_id or util.new_request_id()
427 def normalize_api_kwargs(
428 version: Optional[str]=None,
429 discoveryServiceUrl: Optional[str]=None,
430 host: Optional[str]=None,
431 token: Optional[str]=None,
434 """Validate kwargs from `api` and build kwargs for `api_client`
436 This method takes high-level keyword arguments passed to the `api`
437 constructor and normalizes them into a new dictionary that can be passed
438 as keyword arguments to `api_client`. It raises `ValueError` if required
439 arguments are missing or conflict.
443 * version: str | None --- A string naming the version of the Arvados API
444 to use. If not specified, the code will log a warning and fall back to
447 * discoveryServiceUrl: str | None --- The URL used to discover APIs
448 passed directly to `googleapiclient.discovery.build`. It is an error
449 to pass both `discoveryServiceUrl` and `host`.
451 * host: str | None --- The hostname and optional port number of the
452 Arvados API server. Used to build `discoveryServiceUrl`. It is an
453 error to pass both `discoveryServiceUrl` and `host`.
455 * token: str --- The authentication token to send with each API call.
457 Additional keyword arguments will be included in the return value.
459 if discoveryServiceUrl and host:
460 raise ValueError("both discoveryServiceUrl and host provided")
461 elif discoveryServiceUrl:
462 url_src = "discoveryServiceUrl"
464 url_src = "host argument"
465 discoveryServiceUrl = 'https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' % (host,)
467 # This specific error message gets priority for backwards compatibility.
468 raise ValueError("token argument provided, but host missing.")
470 raise ValueError("neither discoveryServiceUrl nor host provided")
472 raise ValueError("%s provided, but token missing" % (url_src,))
476 "Using default API version. Call arvados.api(%r) instead.",
480 'discoveryServiceUrl': discoveryServiceUrl,
486 def api_kwargs_from_config(
487 version: Optional[str]=None,
488 apiconfig: Optional[Mapping[str, str]]=None,
491 """Build `api_client` keyword arguments from configuration
493 This function accepts a mapping with Arvados configuration settings like
494 `ARVADOS_API_HOST` and converts them into a mapping of keyword arguments
495 that can be passed to `api_client`. If `ARVADOS_API_HOST` or
496 `ARVADOS_API_TOKEN` are not configured, it raises `ValueError`.
500 * version: str | None --- A string naming the version of the Arvados API
501 to use. If not specified, the code will log a warning and fall back to
504 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
505 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
506 `ARVADOS_API_HOST_INSECURE`. If not provided, calls
507 `arvados.config.settings` to get these parameters from user
510 Additional keyword arguments will be included in the return value.
512 if apiconfig is None:
513 apiconfig = config.settings()
514 missing = " and ".join(
516 for key in ['ARVADOS_API_HOST', 'ARVADOS_API_TOKEN']
517 if key not in apiconfig
521 "%s not set.\nPlease set in %s or export environment variable." %
522 (missing, config.default_config_file),
524 return normalize_api_kwargs(
527 apiconfig['ARVADOS_API_HOST'],
528 apiconfig['ARVADOS_API_TOKEN'],
529 insecure=config.flag_is_true('ARVADOS_API_HOST_INSECURE', apiconfig),
534 version: Optional[str]=None,
536 host: Optional[str]=None,
537 token: Optional[str]=None,
538 insecure: bool=False,
539 request_id: Optional[str]=None,
542 discoveryServiceUrl: Optional[str]=None,
544 ) -> ThreadSafeAPIClient:
545 """Dynamically build an Arvados API client
547 This function provides a high-level "do what I mean" interface to build an
548 Arvados API client object. You can call it with no arguments to build a
549 client from user configuration; pass `host` and `token` arguments just
550 like you would write in user configuration; or pass additional arguments
551 for lower-level control over the client.
553 This function returns a `arvados.api.ThreadSafeAPIClient`, an
554 API-compatible wrapper around `googleapiclient.discovery.Resource`. If
555 you're handling concurrency yourself and/or your application is very
556 performance-sensitive, consider calling `api_client` directly.
560 * version: str | None --- A string naming the version of the Arvados API
561 to use. If not specified, the code will log a warning and fall back to
564 * host: str | None --- The hostname and optional port number of the
567 * token: str | None --- The authentication token to send with each API
570 * discoveryServiceUrl: str | None --- The URL used to discover APIs
571 passed directly to `googleapiclient.discovery.build`.
573 If `host`, `token`, and `discoveryServiceUrl` are all omitted, `host` and
574 `token` will be loaded from the user's configuration. Otherwise, you must
575 pass `token` and one of `host` or `discoveryServiceUrl`. It is an error to
576 pass both `host` and `discoveryServiceUrl`.
578 Other arguments are passed directly to `api_client`. See that function's
579 docstring for more information about their meaning.
584 request_id=request_id,
587 if discoveryServiceUrl or host or token:
588 kwargs.update(normalize_api_kwargs(version, discoveryServiceUrl, host, token))
590 kwargs.update(api_kwargs_from_config(version))
591 version = kwargs.pop('version')
592 return ThreadSafeAPIClient({}, {}, kwargs, version)
595 version: Optional[str]=None,
596 apiconfig: Optional[Mapping[str, str]]=None,
598 ) -> ThreadSafeAPIClient:
599 """Build an Arvados API client from a configuration mapping
601 This function builds an Arvados API client from a mapping with user
602 configuration. It accepts that mapping as an argument, so you can use a
603 configuration that's different from what the user has set up.
605 This function returns a `arvados.api.ThreadSafeAPIClient`, an
606 API-compatible wrapper around `googleapiclient.discovery.Resource`. If
607 you're handling concurrency yourself and/or your application is very
608 performance-sensitive, consider calling `api_client` directly.
612 * version: str | None --- A string naming the version of the Arvados API
613 to use. If not specified, the code will log a warning and fall back to
616 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
617 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
618 `ARVADOS_API_HOST_INSECURE`. If not provided, calls
619 `arvados.config.settings` to get these parameters from user
622 Other arguments are passed directly to `api_client`. See that function's
623 docstring for more information about their meaning.
625 return api(**api_kwargs_from_config(version, apiconfig, **kwargs))