25 import arvados.config as config
27 import arvados.retry as retry
31 # Workaround for urllib3 bug.
32 # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
33 # However, urllib3 prior to version 1.10 has a major bug in this feature
34 # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
35 # Unfortunately a certain major Linux distribution is stablizing on urllib3
36 # 1.9.1 which means the following workaround is necessary to be able to use
37 # the arvados python sdk with the distribution-provided packages.
41 urllib3version = re.match(r'(\d+)\.(\d+)\.(\d+)', urllib3.__version__)
42 if (urllib3version and
43 int(urllib3version.group(1)) == 1 and
44 int(urllib3version.group(2)) >= 10):
47 from urllib3.contrib import pyopenssl
48 pyopenssl.extract_from_urllib3()
52 _logger = logging.getLogger('arvados.keep')
53 global_client_object = None
55 class KeepLocator(object):
56 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
57 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
59 def __init__(self, locator_str):
62 self._perm_expiry = None
63 pieces = iter(locator_str.split('+'))
64 self.md5sum = next(pieces)
66 self.size = int(next(pieces))
70 if self.HINT_RE.match(hint) is None:
71 raise ValueError("unrecognized hint data {}".format(hint))
72 elif hint.startswith('A'):
73 self.parse_permission_hint(hint)
75 self.hints.append(hint)
79 str(s) for s in [self.md5sum, self.size,
80 self.permission_hint()] + self.hints
84 if self.size is not None:
85 return "%s+%i" % (self.md5sum, self.size)
89 def _make_hex_prop(name, length):
90 # Build and return a new property with the given name that
91 # must be a hex string of the given length.
92 data_name = '_{}'.format(name)
94 return getattr(self, data_name)
95 def setter(self, hex_str):
96 if not arvados.util.is_hex(hex_str, length):
97 raise ValueError("{} must be a {}-digit hex string: {}".
98 format(name, length, hex_str))
99 setattr(self, data_name, hex_str)
100 return property(getter, setter)
102 md5sum = _make_hex_prop('md5sum', 32)
103 perm_sig = _make_hex_prop('perm_sig', 40)
106 def perm_expiry(self):
107 return self._perm_expiry
110 def perm_expiry(self, value):
111 if not arvados.util.is_hex(value, 1, 8):
113 "permission timestamp must be a hex Unix timestamp: {}".
115 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
117 def permission_hint(self):
118 data = [self.perm_sig, self.perm_expiry]
121 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
122 return "A{}@{:08x}".format(*data)
124 def parse_permission_hint(self, s):
126 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
128 raise ValueError("bad permission hint {}".format(s))
130 def permission_expired(self, as_of_dt=None):
131 if self.perm_expiry is None:
133 elif as_of_dt is None:
134 as_of_dt = datetime.datetime.now()
135 return self.perm_expiry <= as_of_dt
139 """Simple interface to a global KeepClient object.
141 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
142 own API client. The global KeepClient will build an API client from the
143 current Arvados configuration, which may not match the one you built.
148 def global_client_object(cls):
149 global global_client_object
150 # Previously, KeepClient would change its behavior at runtime based
151 # on these configuration settings. We simulate that behavior here
152 # by checking the values and returning a new KeepClient if any of
154 key = (config.get('ARVADOS_API_HOST'),
155 config.get('ARVADOS_API_TOKEN'),
156 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
157 config.get('ARVADOS_KEEP_PROXY'),
158 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
159 os.environ.get('KEEP_LOCAL_STORE'))
160 if (global_client_object is None) or (cls._last_key != key):
161 global_client_object = KeepClient()
163 return global_client_object
166 def get(locator, **kwargs):
167 return Keep.global_client_object().get(locator, **kwargs)
170 def put(data, **kwargs):
171 return Keep.global_client_object().put(data, **kwargs)
173 class KeepBlockCache(object):
174 # Default RAM cache is 256MiB
175 def __init__(self, cache_max=(256 * 1024 * 1024)):
176 self.cache_max = cache_max
178 self._cache_lock = threading.Lock()
180 class CacheSlot(object):
181 def __init__(self, locator):
182 self.locator = locator
183 self.ready = threading.Event()
190 def set(self, value):
195 if self.content is None:
198 return len(self.content)
201 '''Cap the cache size to self.cache_max'''
202 with self._cache_lock:
203 # Select all slots except those where ready.is_set() and content is
204 # None (that means there was an error reading the block).
205 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
206 sm = sum([slot.size() for slot in self._cache])
207 while len(self._cache) > 0 and sm > self.cache_max:
208 for i in xrange(len(self._cache)-1, -1, -1):
209 if self._cache[i].ready.is_set():
212 sm = sum([slot.size() for slot in self._cache])
214 def _get(self, locator):
215 # Test if the locator is already in the cache
216 for i in xrange(0, len(self._cache)):
217 if self._cache[i].locator == locator:
220 # move it to the front
222 self._cache.insert(0, n)
226 def get(self, locator):
227 with self._cache_lock:
228 return self._get(locator)
230 def reserve_cache(self, locator):
231 '''Reserve a cache slot for the specified locator,
232 or return the existing slot.'''
233 with self._cache_lock:
234 n = self._get(locator)
238 # Add a new cache slot for the locator
239 n = KeepBlockCache.CacheSlot(locator)
240 self._cache.insert(0, n)
243 class KeepClient(object):
245 # Default Keep server connection timeout: 2 seconds
246 # Default Keep server read timeout: 300 seconds
247 # Default Keep proxy connection timeout: 20 seconds
248 # Default Keep proxy read timeout: 300 seconds
249 DEFAULT_TIMEOUT = (2, 300)
250 DEFAULT_PROXY_TIMEOUT = (20, 300)
252 class ThreadLimiter(object):
254 Limit the number of threads running at a given time to
255 {desired successes} minus {successes reported}. When successes
256 reported == desired, wake up the remaining threads and tell
259 Should be used in a "with" block.
261 def __init__(self, todo):
264 self._response = None
265 self._todo_lock = threading.Semaphore(todo)
266 self._done_lock = threading.Lock()
269 self._todo_lock.acquire()
272 def __exit__(self, type, value, traceback):
273 self._todo_lock.release()
275 def shall_i_proceed(self):
277 Return true if the current thread should do stuff. Return
278 false if the current thread should just stop.
280 with self._done_lock:
281 return (self._done < self._todo)
283 def save_response(self, response_body, replicas_stored):
285 Records a response body (a locator, possibly signed) returned by
286 the Keep server. It is not necessary to save more than
287 one response, since we presume that any locator returned
288 in response to a successful request is valid.
290 with self._done_lock:
291 self._done += replicas_stored
292 self._response = response_body
296 Returns the body from the response to a PUT request.
298 with self._done_lock:
299 return self._response
303 Return how many successes were reported.
305 with self._done_lock:
309 class KeepService(object):
310 # Make requests to a single Keep service, and track results.
311 HTTP_ERRORS = (requests.exceptions.RequestException,
312 socket.error, ssl.SSLError)
314 def __init__(self, root, session, **headers):
316 self.last_result = None
317 self.success_flag = None
318 self.session = session
319 self.get_headers = {'Accept': 'application/octet-stream'}
320 self.get_headers.update(headers)
321 self.put_headers = headers
324 return self.success_flag is not False
327 return self.success_flag is not None
329 def last_status(self):
331 return self.last_result.status_code
332 except AttributeError:
335 def get(self, locator, timeout=None):
336 # locator is a KeepLocator object.
337 url = self.root + str(locator)
338 _logger.debug("Request: GET %s", url)
340 with timer.Timer() as t:
341 result = self.session.get(url.encode('utf-8'),
342 headers=self.get_headers,
344 except self.HTTP_ERRORS as e:
345 _logger.debug("Request fail: GET %s => %s: %s",
346 url, type(e), str(e))
349 self.last_result = result
350 self.success_flag = retry.check_http_response_success(result)
351 content = result.content
352 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
353 self.last_status(), len(content), t.msecs,
354 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
355 if self.success_flag:
356 resp_md5 = hashlib.md5(content).hexdigest()
357 if resp_md5 == locator.md5sum:
359 _logger.warning("Checksum fail: md5(%s) = %s",
363 def put(self, hash_s, body, timeout=None):
364 url = self.root + hash_s
365 _logger.debug("Request: PUT %s", url)
367 result = self.session.put(url.encode('utf-8'),
369 headers=self.put_headers,
371 except self.HTTP_ERRORS as e:
372 _logger.debug("Request fail: PUT %s => %s: %s",
373 url, type(e), str(e))
376 self.last_result = result
377 self.success_flag = retry.check_http_response_success(result)
378 return self.success_flag
381 class KeepWriterThread(threading.Thread):
383 Write a blob of data to the given Keep server. On success, call
384 save_response() of the given ThreadLimiter to save the returned
387 def __init__(self, keep_service, **kwargs):
388 super(KeepClient.KeepWriterThread, self).__init__()
389 self.service = keep_service
391 self._success = False
397 with self.args['thread_limiter'] as limiter:
398 if not limiter.shall_i_proceed():
399 # My turn arrived, but the job has been done without
402 self.run_with_limiter(limiter)
404 def run_with_limiter(self, limiter):
405 if self.service.finished():
407 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
408 str(threading.current_thread()),
409 self.args['data_hash'],
410 len(self.args['data']),
411 self.args['service_root'])
412 self._success = bool(self.service.put(
413 self.args['data_hash'],
415 timeout=self.args.get('timeout', None)))
416 status = self.service.last_status()
418 result = self.service.last_result
419 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
420 str(threading.current_thread()),
421 self.args['data_hash'],
422 len(self.args['data']),
423 self.args['service_root'])
424 # Tick the 'done' counter for the number of replica
425 # reported stored by the server, for the case that
426 # we're talking to a proxy or other backend that
427 # stores to multiple copies for us.
429 replicas_stored = int(result.headers['x-keep-replicas-stored'])
430 except (KeyError, ValueError):
432 limiter.save_response(result.content.strip(), replicas_stored)
433 elif status is not None:
434 _logger.debug("Request fail: PUT %s => %s %s",
435 self.args['data_hash'], status,
436 self.service.last_result.content)
439 def __init__(self, api_client=None, proxy=None,
440 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
441 api_token=None, local_store=None, block_cache=None,
442 num_retries=0, session=None):
443 """Initialize a new KeepClient.
447 The API client to use to find Keep services. If not
448 provided, KeepClient will build one from available Arvados
452 If specified, this KeepClient will send requests to this Keep
453 proxy. Otherwise, KeepClient will fall back to the setting of the
454 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
455 KeepClient does not use a proxy, pass in an empty string.
458 The timeout (in seconds) for HTTP requests to Keep
459 non-proxy servers. A tuple of two floats is interpreted as
460 (connection_timeout, read_timeout): see
461 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
465 The timeout (in seconds) for HTTP requests to
466 Keep proxies. A tuple of two floats is interpreted as
467 (connection_timeout, read_timeout). Default: (20, 300).
470 If you're not using an API client, but only talking
471 directly to a Keep proxy, this parameter specifies an API token
472 to authenticate Keep requests. It is an error to specify both
473 api_client and api_token. If you specify neither, KeepClient
474 will use one available from the Arvados configuration.
477 If specified, this KeepClient will bypass Keep
478 services, and save data to the named directory. If unspecified,
479 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
480 environment variable. If you want to ensure KeepClient does not
481 use local storage, pass in an empty string. This is primarily
482 intended to mock a server for testing.
485 The default number of times to retry failed requests.
486 This will be used as the default num_retries value when get() and
487 put() are called. Default 0.
490 The requests.Session object to use for get() and put() requests.
491 Will create one if not specified.
493 self.lock = threading.Lock()
495 proxy = config.get('ARVADOS_KEEP_PROXY')
496 if api_token is None:
497 if api_client is None:
498 api_token = config.get('ARVADOS_API_TOKEN')
500 api_token = api_client.api_token
501 elif api_client is not None:
503 "can't build KeepClient with both API client and token")
504 if local_store is None:
505 local_store = os.environ.get('KEEP_LOCAL_STORE')
507 self.block_cache = block_cache if block_cache else KeepBlockCache()
508 self.timeout = timeout
509 self.proxy_timeout = proxy_timeout
512 self.local_store = local_store
513 self.get = self.local_store_get
514 self.put = self.local_store_put
516 self.num_retries = num_retries
517 self.session = session if session is not None else requests.Session()
519 if not proxy.endswith('/'):
521 self.api_token = api_token
522 self._keep_services = [{
524 '_service_root': proxy,
526 self.using_proxy = True
527 self._static_services_list = True
529 # It's important to avoid instantiating an API client
530 # unless we actually need one, for testing's sake.
531 if api_client is None:
532 api_client = arvados.api('v1')
533 self.api_client = api_client
534 self.api_token = api_client.api_token
535 self._keep_services = None
536 self.using_proxy = None
537 self._static_services_list = False
539 def current_timeout(self):
540 """Return the appropriate timeout to use for this client: the proxy
541 timeout setting if the backend service is currently a proxy,
542 the regular timeout setting otherwise.
544 # TODO(twp): the timeout should be a property of a
545 # KeepService, not a KeepClient. See #4488.
546 return self.proxy_timeout if self.using_proxy else self.timeout
548 def build_services_list(self, force_rebuild=False):
549 if (self._static_services_list or
550 (self._keep_services and not force_rebuild)):
554 keep_services = self.api_client.keep_services().accessible()
555 except Exception: # API server predates Keep services.
556 keep_services = self.api_client.keep_disks().list()
558 self._keep_services = keep_services.execute().get('items')
559 if not self._keep_services:
560 raise arvados.errors.NoKeepServersError()
562 self.using_proxy = any(ks.get('service_type') == 'proxy'
563 for ks in self._keep_services)
565 # Precompute the base URI for each service.
566 for r in self._keep_services:
567 r['_service_root'] = "{}://[{}]:{:d}/".format(
568 'https' if r['service_ssl_flag'] else 'http',
571 _logger.debug(str(self._keep_services))
573 def _service_weight(self, data_hash, service_uuid):
574 """Compute the weight of a Keep service endpoint for a data
575 block with a known hash.
577 The weight is md5(h + u) where u is the last 15 characters of
578 the service endpoint's UUID.
580 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
582 def weighted_service_roots(self, data_hash, force_rebuild=False):
583 """Return an array of Keep service endpoints, in the order in
584 which they should be probed when reading or writing data with
587 self.build_services_list(force_rebuild)
589 # Sort the available services by weight (heaviest first) for
590 # this data_hash, and return their service_roots (base URIs)
593 svc['_service_root'] for svc in sorted(
596 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
597 _logger.debug(data_hash + ': ' + str(sorted_roots))
600 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
601 # roots_map is a dictionary, mapping Keep service root strings
602 # to KeepService objects. Poll for Keep services, and add any
603 # new ones to roots_map. Return the current list of local
605 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
606 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
607 for root in local_roots:
608 if root not in roots_map:
609 roots_map[root] = self.KeepService(root, self.session, **headers)
613 def _check_loop_result(result):
614 # KeepClient RetryLoops should save results as a 2-tuple: the
615 # actual result of the request, and the number of servers available
616 # to receive the request this round.
617 # This method returns True if there's a real result, False if
618 # there are no more servers available, otherwise None.
619 if isinstance(result, Exception):
621 result, tried_server_count = result
622 if (result is not None) and (result is not False):
624 elif tried_server_count < 1:
625 _logger.info("No more Keep services to try; giving up")
630 def get_from_cache(self, loc):
631 """Fetch a block only if is in the cache, otherwise return None."""
632 slot = self.block_cache.get(loc)
633 if slot.ready.is_set():
639 def get(self, loc_s, num_retries=None):
640 """Get data from Keep.
642 This method fetches one or more blocks of data from Keep. It
643 sends a request each Keep service registered with the API
644 server (or the proxy provided when this client was
645 instantiated), then each service named in location hints, in
646 sequence. As soon as one service provides the data, it's
650 * loc_s: A string of one or more comma-separated locators to fetch.
651 This method returns the concatenation of these blocks.
652 * num_retries: The number of times to retry GET requests to
653 *each* Keep server if it returns temporary failures, with
654 exponential backoff. Note that, in each loop, the method may try
655 to fetch data from every available Keep service, along with any
656 that are named in location hints in the locator. The default value
657 is set when the KeepClient is initialized.
660 return ''.join(self.get(x) for x in loc_s.split(','))
661 locator = KeepLocator(loc_s)
662 expect_hash = locator.md5sum
663 slot, first = self.block_cache.reserve_cache(expect_hash)
668 # See #3147 for a discussion of the loop implementation. Highlights:
669 # * Refresh the list of Keep services after each failure, in case
670 # it's being updated.
671 # * Retry until we succeed, we're out of retries, or every available
672 # service has returned permanent failure.
673 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
674 for hint in locator.hints if hint.startswith('K@')]
675 # Map root URLs their KeepService objects.
676 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
678 loop = retry.RetryLoop(num_retries, self._check_loop_result,
680 for tries_left in loop:
682 local_roots = self.map_new_services(
683 roots_map, expect_hash,
684 force_rebuild=(tries_left < num_retries))
685 except Exception as error:
686 loop.save_result(error)
689 # Query KeepService objects that haven't returned
690 # permanent failure, in our specified shuffle order.
691 services_to_try = [roots_map[root]
692 for root in (local_roots + hint_roots)
693 if roots_map[root].usable()]
694 for keep_service in services_to_try:
695 blob = keep_service.get(locator, timeout=self.current_timeout())
698 loop.save_result((blob, len(services_to_try)))
700 # Always cache the result, then return it if we succeeded.
702 self.block_cache.cap_cache()
707 all_roots = local_roots + hint_roots
709 # We never successfully fetched local_roots.
710 all_roots = hint_roots
711 # Q: Including 403 is necessary for the Keep tests to continue
712 # passing, but maybe they should expect KeepReadError instead?
713 not_founds = sum(1 for key in all_roots
714 if roots_map[key].last_status() in {403, 404, 410})
715 service_errors = ((key, roots_map[key].last_result)
716 for key in all_roots)
718 raise arvados.errors.KeepReadError(
719 "failed to read {}: no Keep services available ({})".format(
720 loc_s, loop.last_result()))
721 elif not_founds == len(all_roots):
722 raise arvados.errors.NotFoundError(
723 "{} not found".format(loc_s), service_errors)
725 raise arvados.errors.KeepReadError(
726 "failed to read {}".format(loc_s), service_errors, label="service")
729 def put(self, data, copies=2, num_retries=None):
730 """Save data in Keep.
732 This method will get a list of Keep services from the API server, and
733 send the data to each one simultaneously in a new thread. Once the
734 uploads are finished, if enough copies are saved, this method returns
735 the most recent HTTP response body. If requests fail to upload
736 enough copies, this method raises KeepWriteError.
739 * data: The string of data to upload.
740 * copies: The number of copies that the user requires be saved.
742 * num_retries: The number of times to retry PUT requests to
743 *each* Keep server if it returns temporary failures, with
744 exponential backoff. The default value is set when the
745 KeepClient is initialized.
748 if isinstance(data, unicode):
749 data = data.encode("ascii")
750 elif not isinstance(data, str):
751 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
753 data_hash = hashlib.md5(data).hexdigest()
759 # Tell the proxy how many copies we want it to store
760 headers['X-Keep-Desired-Replication'] = str(copies)
762 thread_limiter = KeepClient.ThreadLimiter(copies)
763 loop = retry.RetryLoop(num_retries, self._check_loop_result,
765 for tries_left in loop:
767 local_roots = self.map_new_services(
768 roots_map, data_hash,
769 force_rebuild=(tries_left < num_retries), **headers)
770 except Exception as error:
771 loop.save_result(error)
775 for service_root, ks in roots_map.iteritems():
778 t = KeepClient.KeepWriterThread(
782 service_root=service_root,
783 thread_limiter=thread_limiter,
784 timeout=self.current_timeout())
789 loop.save_result((thread_limiter.done() >= copies, len(threads)))
792 return thread_limiter.response()
794 raise arvados.errors.KeepWriteError(
795 "failed to write {}: no Keep services available ({})".format(
796 data_hash, loop.last_result()))
798 service_errors = ((key, roots_map[key].last_result)
799 for key in local_roots
800 if not roots_map[key].success_flag)
801 raise arvados.errors.KeepWriteError(
802 "failed to write {} (wanted {} copies but wrote {})".format(
803 data_hash, copies, thread_limiter.done()), service_errors, label="service")
805 def local_store_put(self, data, copies=1, num_retries=None):
808 This method is used in place of the real put() method when
809 using local storage (see constructor's local_store argument).
811 copies and num_retries arguments are ignored: they are here
812 only for the sake of offering the same call signature as
815 Data stored this way can be retrieved via local_store_get().
817 md5 = hashlib.md5(data).hexdigest()
818 locator = '%s+%d' % (md5, len(data))
819 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
821 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
822 os.path.join(self.local_store, md5))
825 def local_store_get(self, loc_s, num_retries=None):
826 """Companion to local_store_put()."""
828 locator = KeepLocator(loc_s)
830 raise arvados.errors.NotFoundError(
831 "Invalid data locator: '%s'" % loc_s)
832 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
834 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
837 def is_cached(self, locator):
838 return self.block_cache.reserve_cache(expect_hash)