25 import arvados.config as config
27 import arvados.retry as retry
31 # Workaround for urllib3 bug.
32 # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
33 # However, urllib3 prior to version 1.10 has a major bug in this feature
34 # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
35 # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
36 # following workaround is necessary to be able to use
37 # the arvados python sdk with the distribution-provided packages.
39 from pkg_resources import parse_version
40 if parse_version(urllib3.__version__) < parse_version('1.10'):
41 from urllib3.contrib import pyopenssl
42 pyopenssl.extract_from_urllib3()
46 _logger = logging.getLogger('arvados.keep')
47 global_client_object = None
49 class KeepLocator(object):
50 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
51 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53 def __init__(self, locator_str):
56 self._perm_expiry = None
57 pieces = iter(locator_str.split('+'))
58 self.md5sum = next(pieces)
60 self.size = int(next(pieces))
64 if self.HINT_RE.match(hint) is None:
65 raise ValueError("invalid hint format: {}".format(hint))
66 elif hint.startswith('A'):
67 self.parse_permission_hint(hint)
69 self.hints.append(hint)
73 str(s) for s in [self.md5sum, self.size,
74 self.permission_hint()] + self.hints
78 if self.size is not None:
79 return "%s+%i" % (self.md5sum, self.size)
83 def _make_hex_prop(name, length):
84 # Build and return a new property with the given name that
85 # must be a hex string of the given length.
86 data_name = '_{}'.format(name)
88 return getattr(self, data_name)
89 def setter(self, hex_str):
90 if not arvados.util.is_hex(hex_str, length):
91 raise ValueError("{} must be a {}-digit hex string: {}".
92 format(name, length, hex_str))
93 setattr(self, data_name, hex_str)
94 return property(getter, setter)
96 md5sum = _make_hex_prop('md5sum', 32)
97 perm_sig = _make_hex_prop('perm_sig', 40)
100 def perm_expiry(self):
101 return self._perm_expiry
104 def perm_expiry(self, value):
105 if not arvados.util.is_hex(value, 1, 8):
107 "permission timestamp must be a hex Unix timestamp: {}".
109 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
111 def permission_hint(self):
112 data = [self.perm_sig, self.perm_expiry]
115 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
116 return "A{}@{:08x}".format(*data)
118 def parse_permission_hint(self, s):
120 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
122 raise ValueError("bad permission hint {}".format(s))
124 def permission_expired(self, as_of_dt=None):
125 if self.perm_expiry is None:
127 elif as_of_dt is None:
128 as_of_dt = datetime.datetime.now()
129 return self.perm_expiry <= as_of_dt
133 """Simple interface to a global KeepClient object.
135 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
136 own API client. The global KeepClient will build an API client from the
137 current Arvados configuration, which may not match the one you built.
142 def global_client_object(cls):
143 global global_client_object
144 # Previously, KeepClient would change its behavior at runtime based
145 # on these configuration settings. We simulate that behavior here
146 # by checking the values and returning a new KeepClient if any of
148 key = (config.get('ARVADOS_API_HOST'),
149 config.get('ARVADOS_API_TOKEN'),
150 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
151 config.get('ARVADOS_KEEP_PROXY'),
152 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
153 os.environ.get('KEEP_LOCAL_STORE'))
154 if (global_client_object is None) or (cls._last_key != key):
155 global_client_object = KeepClient()
157 return global_client_object
160 def get(locator, **kwargs):
161 return Keep.global_client_object().get(locator, **kwargs)
164 def put(data, **kwargs):
165 return Keep.global_client_object().put(data, **kwargs)
167 class KeepBlockCache(object):
168 # Default RAM cache is 256MiB
169 def __init__(self, cache_max=(256 * 1024 * 1024)):
170 self.cache_max = cache_max
172 self._cache_lock = threading.Lock()
174 class CacheSlot(object):
175 def __init__(self, locator):
176 self.locator = locator
177 self.ready = threading.Event()
184 def set(self, value):
189 if self.content is None:
192 return len(self.content)
195 '''Cap the cache size to self.cache_max'''
196 with self._cache_lock:
197 # Select all slots except those where ready.is_set() and content is
198 # None (that means there was an error reading the block).
199 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
200 sm = sum([slot.size() for slot in self._cache])
201 while len(self._cache) > 0 and sm > self.cache_max:
202 for i in xrange(len(self._cache)-1, -1, -1):
203 if self._cache[i].ready.is_set():
206 sm = sum([slot.size() for slot in self._cache])
208 def _get(self, locator):
209 # Test if the locator is already in the cache
210 for i in xrange(0, len(self._cache)):
211 if self._cache[i].locator == locator:
214 # move it to the front
216 self._cache.insert(0, n)
220 def get(self, locator):
221 with self._cache_lock:
222 return self._get(locator)
224 def reserve_cache(self, locator):
225 '''Reserve a cache slot for the specified locator,
226 or return the existing slot.'''
227 with self._cache_lock:
228 n = self._get(locator)
232 # Add a new cache slot for the locator
233 n = KeepBlockCache.CacheSlot(locator)
234 self._cache.insert(0, n)
237 class KeepClient(object):
239 # Default Keep server connection timeout: 2 seconds
240 # Default Keep server read timeout: 300 seconds
241 # Default Keep proxy connection timeout: 20 seconds
242 # Default Keep proxy read timeout: 300 seconds
243 DEFAULT_TIMEOUT = (2, 300)
244 DEFAULT_PROXY_TIMEOUT = (20, 300)
246 class ThreadLimiter(object):
248 Limit the number of threads running at a given time to
249 {desired successes} minus {successes reported}. When successes
250 reported == desired, wake up the remaining threads and tell
253 Should be used in a "with" block.
255 def __init__(self, todo):
258 self._response = None
259 self._todo_lock = threading.Semaphore(todo)
260 self._done_lock = threading.Lock()
263 self._todo_lock.acquire()
266 def __exit__(self, type, value, traceback):
267 self._todo_lock.release()
269 def shall_i_proceed(self):
271 Return true if the current thread should do stuff. Return
272 false if the current thread should just stop.
274 with self._done_lock:
275 return (self._done < self._todo)
277 def save_response(self, response_body, replicas_stored):
279 Records a response body (a locator, possibly signed) returned by
280 the Keep server. It is not necessary to save more than
281 one response, since we presume that any locator returned
282 in response to a successful request is valid.
284 with self._done_lock:
285 self._done += replicas_stored
286 self._response = response_body
290 Returns the body from the response to a PUT request.
292 with self._done_lock:
293 return self._response
297 Return how many successes were reported.
299 with self._done_lock:
303 class KeepService(object):
304 # Make requests to a single Keep service, and track results.
305 HTTP_ERRORS = (requests.exceptions.RequestException,
306 socket.error, ssl.SSLError)
308 def __init__(self, root, session, **headers):
310 self.last_result = None
311 self.success_flag = None
312 self.session = session
313 self.get_headers = {'Accept': 'application/octet-stream'}
314 self.get_headers.update(headers)
315 self.put_headers = headers
318 return self.success_flag is not False
321 return self.success_flag is not None
323 def last_status(self):
325 return self.last_result.status_code
326 except AttributeError:
329 def get(self, locator, timeout=None):
330 # locator is a KeepLocator object.
331 url = self.root + str(locator)
332 _logger.debug("Request: GET %s", url)
334 with timer.Timer() as t:
335 result = self.session.get(url.encode('utf-8'),
336 headers=self.get_headers,
338 except self.HTTP_ERRORS as e:
339 _logger.debug("Request fail: GET %s => %s: %s",
340 url, type(e), str(e))
343 self.last_result = result
344 self.success_flag = retry.check_http_response_success(result)
345 content = result.content
346 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
347 self.last_status(), len(content), t.msecs,
348 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
349 if self.success_flag:
350 resp_md5 = hashlib.md5(content).hexdigest()
351 if resp_md5 == locator.md5sum:
353 _logger.warning("Checksum fail: md5(%s) = %s",
357 def put(self, hash_s, body, timeout=None):
358 url = self.root + hash_s
359 _logger.debug("Request: PUT %s", url)
361 result = self.session.put(url.encode('utf-8'),
363 headers=self.put_headers,
365 except self.HTTP_ERRORS as e:
366 _logger.debug("Request fail: PUT %s => %s: %s",
367 url, type(e), str(e))
370 self.last_result = result
371 self.success_flag = retry.check_http_response_success(result)
372 return self.success_flag
375 class KeepWriterThread(threading.Thread):
377 Write a blob of data to the given Keep server. On success, call
378 save_response() of the given ThreadLimiter to save the returned
381 def __init__(self, keep_service, **kwargs):
382 super(KeepClient.KeepWriterThread, self).__init__()
383 self.service = keep_service
385 self._success = False
391 with self.args['thread_limiter'] as limiter:
392 if not limiter.shall_i_proceed():
393 # My turn arrived, but the job has been done without
396 self.run_with_limiter(limiter)
398 def run_with_limiter(self, limiter):
399 if self.service.finished():
401 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
402 str(threading.current_thread()),
403 self.args['data_hash'],
404 len(self.args['data']),
405 self.args['service_root'])
406 self._success = bool(self.service.put(
407 self.args['data_hash'],
409 timeout=self.args.get('timeout', None)))
410 status = self.service.last_status()
412 result = self.service.last_result
413 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
414 str(threading.current_thread()),
415 self.args['data_hash'],
416 len(self.args['data']),
417 self.args['service_root'])
418 # Tick the 'done' counter for the number of replica
419 # reported stored by the server, for the case that
420 # we're talking to a proxy or other backend that
421 # stores to multiple copies for us.
423 replicas_stored = int(result.headers['x-keep-replicas-stored'])
424 except (KeyError, ValueError):
426 limiter.save_response(result.content.strip(), replicas_stored)
427 elif status is not None:
428 _logger.debug("Request fail: PUT %s => %s %s",
429 self.args['data_hash'], status,
430 self.service.last_result.content)
433 def __init__(self, api_client=None, proxy=None,
434 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
435 api_token=None, local_store=None, block_cache=None,
436 num_retries=0, session=None):
437 """Initialize a new KeepClient.
441 The API client to use to find Keep services. If not
442 provided, KeepClient will build one from available Arvados
446 If specified, this KeepClient will send requests to this Keep
447 proxy. Otherwise, KeepClient will fall back to the setting of the
448 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
449 KeepClient does not use a proxy, pass in an empty string.
452 The initial timeout (in seconds) for HTTP requests to Keep
453 non-proxy servers. A tuple of two floats is interpreted as
454 (connection_timeout, read_timeout): see
455 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
456 Because timeouts are often a result of transient server load, the
457 actual connection timeout will be increased by a factor of two on
462 The initial timeout (in seconds) for HTTP requests to
463 Keep proxies. A tuple of two floats is interpreted as
464 (connection_timeout, read_timeout). The behavior described
465 above for adjusting connection timeouts on retry also applies.
469 If you're not using an API client, but only talking
470 directly to a Keep proxy, this parameter specifies an API token
471 to authenticate Keep requests. It is an error to specify both
472 api_client and api_token. If you specify neither, KeepClient
473 will use one available from the Arvados configuration.
476 If specified, this KeepClient will bypass Keep
477 services, and save data to the named directory. If unspecified,
478 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
479 environment variable. If you want to ensure KeepClient does not
480 use local storage, pass in an empty string. This is primarily
481 intended to mock a server for testing.
484 The default number of times to retry failed requests.
485 This will be used as the default num_retries value when get() and
486 put() are called. Default 0.
489 The requests.Session object to use for get() and put() requests.
490 Will create one if not specified.
492 self.lock = threading.Lock()
494 proxy = config.get('ARVADOS_KEEP_PROXY')
495 if api_token is None:
496 if api_client is None:
497 api_token = config.get('ARVADOS_API_TOKEN')
499 api_token = api_client.api_token
500 elif api_client is not None:
502 "can't build KeepClient with both API client and token")
503 if local_store is None:
504 local_store = os.environ.get('KEEP_LOCAL_STORE')
506 self.block_cache = block_cache if block_cache else KeepBlockCache()
507 self.timeout = timeout
508 self.proxy_timeout = proxy_timeout
511 self.local_store = local_store
512 self.get = self.local_store_get
513 self.put = self.local_store_put
515 self.num_retries = num_retries
516 self.session = session if session is not None else requests.Session()
518 if not proxy.endswith('/'):
520 self.api_token = api_token
521 self._gateway_services = {}
522 self._keep_services = [{
524 '_service_root': proxy,
526 self.using_proxy = True
527 self._static_services_list = True
529 # It's important to avoid instantiating an API client
530 # unless we actually need one, for testing's sake.
531 if api_client is None:
532 api_client = arvados.api('v1')
533 self.api_client = api_client
534 self.api_token = api_client.api_token
535 self._gateway_services = {}
536 self._keep_services = None
537 self.using_proxy = None
538 self._static_services_list = False
540 def current_timeout(self, attempt_number):
541 """Return the appropriate timeout to use for this client.
543 The proxy timeout setting if the backend service is currently a proxy,
544 the regular timeout setting otherwise. The `attempt_number` indicates
545 how many times the operation has been tried already (starting from 0
546 for the first try), and scales the connection timeout portion of the
547 return value accordingly.
550 # TODO(twp): the timeout should be a property of a
551 # KeepService, not a KeepClient. See #4488.
552 t = self.proxy_timeout if self.using_proxy else self.timeout
553 return (t[0] * (1 << attempt_number), t[1])
555 def build_services_list(self, force_rebuild=False):
556 if (self._static_services_list or
557 (self._keep_services and not force_rebuild)):
561 keep_services = self.api_client.keep_services().accessible()
562 except Exception: # API server predates Keep services.
563 keep_services = self.api_client.keep_disks().list()
565 accessible = keep_services.execute().get('items')
567 raise arvados.errors.NoKeepServersError()
569 # Precompute the base URI for each service.
571 r['_service_root'] = "{}://[{}]:{:d}/".format(
572 'https' if r['service_ssl_flag'] else 'http',
576 # Gateway services are only used when specified by UUID,
577 # so there's nothing to gain by filtering them by
579 self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
580 _logger.debug(str(self._gateway_services))
582 self._keep_services = [
583 ks for ks in accessible
584 if ks.get('service_type') in ['disk', 'proxy']]
585 _logger.debug(str(self._keep_services))
587 self.using_proxy = any(ks.get('service_type') == 'proxy'
588 for ks in self._keep_services)
590 def _service_weight(self, data_hash, service_uuid):
591 """Compute the weight of a Keep service endpoint for a data
592 block with a known hash.
594 The weight is md5(h + u) where u is the last 15 characters of
595 the service endpoint's UUID.
597 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
599 def weighted_service_roots(self, locator, force_rebuild=False):
600 """Return an array of Keep service endpoints, in the order in
601 which they should be probed when reading or writing data with
602 the given hash+hints.
604 self.build_services_list(force_rebuild)
608 # Use the services indicated by the given +K@... remote
609 # service hints, if any are present and can be resolved to a
611 for hint in locator.hints:
612 if hint.startswith('K@'):
615 "https://keep.{}.arvadosapi.com/".format(hint[2:]))
616 elif len(hint) == 29:
617 svc = self._gateway_services.get(hint[2:])
619 sorted_roots.append(svc['_service_root'])
621 # Sort the available local services by weight (heaviest first)
622 # for this locator, and return their service_roots (base URIs)
624 sorted_roots.extend([
625 svc['_service_root'] for svc in sorted(
628 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
629 _logger.debug("{}: {}".format(locator, sorted_roots))
632 def map_new_services(self, roots_map, locator, force_rebuild, **headers):
633 # roots_map is a dictionary, mapping Keep service root strings
634 # to KeepService objects. Poll for Keep services, and add any
635 # new ones to roots_map. Return the current list of local
637 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
638 local_roots = self.weighted_service_roots(locator, force_rebuild)
639 for root in local_roots:
640 if root not in roots_map:
641 roots_map[root] = self.KeepService(root, self.session, **headers)
645 def _check_loop_result(result):
646 # KeepClient RetryLoops should save results as a 2-tuple: the
647 # actual result of the request, and the number of servers available
648 # to receive the request this round.
649 # This method returns True if there's a real result, False if
650 # there are no more servers available, otherwise None.
651 if isinstance(result, Exception):
653 result, tried_server_count = result
654 if (result is not None) and (result is not False):
656 elif tried_server_count < 1:
657 _logger.info("No more Keep services to try; giving up")
662 def get_from_cache(self, loc):
663 """Fetch a block only if is in the cache, otherwise return None."""
664 slot = self.block_cache.get(loc)
665 if slot and slot.ready.is_set():
671 def get(self, loc_s, num_retries=None):
672 """Get data from Keep.
674 This method fetches one or more blocks of data from Keep. It
675 sends a request each Keep service registered with the API
676 server (or the proxy provided when this client was
677 instantiated), then each service named in location hints, in
678 sequence. As soon as one service provides the data, it's
682 * loc_s: A string of one or more comma-separated locators to fetch.
683 This method returns the concatenation of these blocks.
684 * num_retries: The number of times to retry GET requests to
685 *each* Keep server if it returns temporary failures, with
686 exponential backoff. Note that, in each loop, the method may try
687 to fetch data from every available Keep service, along with any
688 that are named in location hints in the locator. The default value
689 is set when the KeepClient is initialized.
692 return ''.join(self.get(x) for x in loc_s.split(','))
693 locator = KeepLocator(loc_s)
694 slot, first = self.block_cache.reserve_cache(locator.md5sum)
699 # If the locator has hints specifying a prefix (indicating a
700 # remote keepproxy) or the UUID of a local gateway service,
701 # read data from the indicated service(s) instead of the usual
702 # list of local disk services.
703 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
704 for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
705 hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
706 for hint in locator.hints if (
707 hint.startswith('K@') and
709 self._gateway_services.get(hint[2:])
711 # Map root URLs to their KeepService objects.
712 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
714 # See #3147 for a discussion of the loop implementation. Highlights:
715 # * Refresh the list of Keep services after each failure, in case
716 # it's being updated.
717 # * Retry until we succeed, we're out of retries, or every available
718 # service has returned permanent failure.
722 loop = retry.RetryLoop(num_retries, self._check_loop_result,
724 for tries_left in loop:
726 sorted_roots = self.map_new_services(
728 force_rebuild=(tries_left < num_retries))
729 except Exception as error:
730 loop.save_result(error)
733 # Query KeepService objects that haven't returned
734 # permanent failure, in our specified shuffle order.
735 services_to_try = [roots_map[root]
736 for root in sorted_roots
737 if roots_map[root].usable()]
738 for keep_service in services_to_try:
739 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
742 loop.save_result((blob, len(services_to_try)))
744 # Always cache the result, then return it if we succeeded.
746 self.block_cache.cap_cache()
750 # Q: Including 403 is necessary for the Keep tests to continue
751 # passing, but maybe they should expect KeepReadError instead?
752 not_founds = sum(1 for key in sorted_roots
753 if roots_map[key].last_status() in {403, 404, 410})
754 service_errors = ((key, roots_map[key].last_result)
755 for key in sorted_roots)
757 raise arvados.errors.KeepReadError(
758 "failed to read {}: no Keep services available ({})".format(
759 loc_s, loop.last_result()))
760 elif not_founds == len(sorted_roots):
761 raise arvados.errors.NotFoundError(
762 "{} not found".format(loc_s), service_errors)
764 raise arvados.errors.KeepReadError(
765 "failed to read {}".format(loc_s), service_errors, label="service")
768 def put(self, data, copies=2, num_retries=None):
769 """Save data in Keep.
771 This method will get a list of Keep services from the API server, and
772 send the data to each one simultaneously in a new thread. Once the
773 uploads are finished, if enough copies are saved, this method returns
774 the most recent HTTP response body. If requests fail to upload
775 enough copies, this method raises KeepWriteError.
778 * data: The string of data to upload.
779 * copies: The number of copies that the user requires be saved.
781 * num_retries: The number of times to retry PUT requests to
782 *each* Keep server if it returns temporary failures, with
783 exponential backoff. The default value is set when the
784 KeepClient is initialized.
787 if isinstance(data, unicode):
788 data = data.encode("ascii")
789 elif not isinstance(data, str):
790 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
792 data_hash = hashlib.md5(data).hexdigest()
795 locator = KeepLocator(data_hash + '+' + str(len(data)))
799 # Tell the proxy how many copies we want it to store
800 headers['X-Keep-Desired-Replication'] = str(copies)
802 thread_limiter = KeepClient.ThreadLimiter(copies)
803 loop = retry.RetryLoop(num_retries, self._check_loop_result,
805 for tries_left in loop:
807 local_roots = self.map_new_services(
809 force_rebuild=(tries_left < num_retries), **headers)
810 except Exception as error:
811 loop.save_result(error)
815 for service_root, ks in roots_map.iteritems():
818 t = KeepClient.KeepWriterThread(
822 service_root=service_root,
823 thread_limiter=thread_limiter,
824 timeout=self.current_timeout(num_retries-tries_left))
829 loop.save_result((thread_limiter.done() >= copies, len(threads)))
832 return thread_limiter.response()
834 raise arvados.errors.KeepWriteError(
835 "failed to write {}: no Keep services available ({})".format(
836 data_hash, loop.last_result()))
838 service_errors = ((key, roots_map[key].last_result)
839 for key in local_roots
840 if not roots_map[key].success_flag)
841 raise arvados.errors.KeepWriteError(
842 "failed to write {} (wanted {} copies but wrote {})".format(
843 data_hash, copies, thread_limiter.done()), service_errors, label="service")
845 def local_store_put(self, data, copies=1, num_retries=None):
848 This method is used in place of the real put() method when
849 using local storage (see constructor's local_store argument).
851 copies and num_retries arguments are ignored: they are here
852 only for the sake of offering the same call signature as
855 Data stored this way can be retrieved via local_store_get().
857 md5 = hashlib.md5(data).hexdigest()
858 locator = '%s+%d' % (md5, len(data))
859 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
861 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
862 os.path.join(self.local_store, md5))
865 def local_store_get(self, loc_s, num_retries=None):
866 """Companion to local_store_put()."""
868 locator = KeepLocator(loc_s)
870 raise arvados.errors.NotFoundError(
871 "Invalid data locator: '%s'" % loc_s)
872 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
874 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
877 def is_cached(self, locator):
878 return self.block_cache.reserve_cache(expect_hash)