25 import arvados.config as config
27 import arvados.retry as retry
31 # Workaround for urllib3 bug.
32 # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
33 # However, urllib3 prior to version 1.10 has a major bug in this feature
34 # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
35 # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
36 # following workaround is necessary to be able to use
37 # the arvados python sdk with the distribution-provided packages.
39 from pkg_resources import parse_version
40 if parse_version(urllib3.__version__) < parse_version('1.10'):
41 from urllib3.contrib import pyopenssl
42 pyopenssl.extract_from_urllib3()
46 _logger = logging.getLogger('arvados.keep')
47 global_client_object = None
49 class KeepLocator(object):
50 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
51 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53 def __init__(self, locator_str):
56 self._perm_expiry = None
57 pieces = iter(locator_str.split('+'))
58 self.md5sum = next(pieces)
60 self.size = int(next(pieces))
64 if self.HINT_RE.match(hint) is None:
65 raise ValueError("unrecognized hint data {}".format(hint))
66 elif hint.startswith('A'):
67 self.parse_permission_hint(hint)
69 self.hints.append(hint)
73 str(s) for s in [self.md5sum, self.size,
74 self.permission_hint()] + self.hints
78 if self.size is not None:
79 return "%s+%i" % (self.md5sum, self.size)
83 def _make_hex_prop(name, length):
84 # Build and return a new property with the given name that
85 # must be a hex string of the given length.
86 data_name = '_{}'.format(name)
88 return getattr(self, data_name)
89 def setter(self, hex_str):
90 if not arvados.util.is_hex(hex_str, length):
91 raise ValueError("{} must be a {}-digit hex string: {}".
92 format(name, length, hex_str))
93 setattr(self, data_name, hex_str)
94 return property(getter, setter)
96 md5sum = _make_hex_prop('md5sum', 32)
97 perm_sig = _make_hex_prop('perm_sig', 40)
100 def perm_expiry(self):
101 return self._perm_expiry
104 def perm_expiry(self, value):
105 if not arvados.util.is_hex(value, 1, 8):
107 "permission timestamp must be a hex Unix timestamp: {}".
109 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
111 def permission_hint(self):
112 data = [self.perm_sig, self.perm_expiry]
115 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
116 return "A{}@{:08x}".format(*data)
118 def parse_permission_hint(self, s):
120 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
122 raise ValueError("bad permission hint {}".format(s))
124 def permission_expired(self, as_of_dt=None):
125 if self.perm_expiry is None:
127 elif as_of_dt is None:
128 as_of_dt = datetime.datetime.now()
129 return self.perm_expiry <= as_of_dt
133 """Simple interface to a global KeepClient object.
135 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
136 own API client. The global KeepClient will build an API client from the
137 current Arvados configuration, which may not match the one you built.
142 def global_client_object(cls):
143 global global_client_object
144 # Previously, KeepClient would change its behavior at runtime based
145 # on these configuration settings. We simulate that behavior here
146 # by checking the values and returning a new KeepClient if any of
148 key = (config.get('ARVADOS_API_HOST'),
149 config.get('ARVADOS_API_TOKEN'),
150 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
151 config.get('ARVADOS_KEEP_PROXY'),
152 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
153 os.environ.get('KEEP_LOCAL_STORE'))
154 if (global_client_object is None) or (cls._last_key != key):
155 global_client_object = KeepClient()
157 return global_client_object
160 def get(locator, **kwargs):
161 return Keep.global_client_object().get(locator, **kwargs)
164 def put(data, **kwargs):
165 return Keep.global_client_object().put(data, **kwargs)
167 class KeepBlockCache(object):
168 # Default RAM cache is 256MiB
169 def __init__(self, cache_max=(256 * 1024 * 1024)):
170 self.cache_max = cache_max
172 self._cache_lock = threading.Lock()
174 class CacheSlot(object):
175 def __init__(self, locator):
176 self.locator = locator
177 self.ready = threading.Event()
184 def set(self, value):
189 if self.content is None:
192 return len(self.content)
195 '''Cap the cache size to self.cache_max'''
196 with self._cache_lock:
197 # Select all slots except those where ready.is_set() and content is
198 # None (that means there was an error reading the block).
199 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
200 sm = sum([slot.size() for slot in self._cache])
201 while len(self._cache) > 0 and sm > self.cache_max:
202 for i in xrange(len(self._cache)-1, -1, -1):
203 if self._cache[i].ready.is_set():
206 sm = sum([slot.size() for slot in self._cache])
208 def _get(self, locator):
209 # Test if the locator is already in the cache
210 for i in xrange(0, len(self._cache)):
211 if self._cache[i].locator == locator:
214 # move it to the front
216 self._cache.insert(0, n)
220 def get(self, locator):
221 with self._cache_lock:
222 return self._get(locator)
224 def reserve_cache(self, locator):
225 '''Reserve a cache slot for the specified locator,
226 or return the existing slot.'''
227 with self._cache_lock:
228 n = self._get(locator)
232 # Add a new cache slot for the locator
233 n = KeepBlockCache.CacheSlot(locator)
234 self._cache.insert(0, n)
237 class KeepClient(object):
239 # Default Keep server connection timeout: 2 seconds
240 # Default Keep server read timeout: 300 seconds
241 # Default Keep proxy connection timeout: 20 seconds
242 # Default Keep proxy read timeout: 300 seconds
243 DEFAULT_TIMEOUT = (2, 300)
244 DEFAULT_PROXY_TIMEOUT = (20, 300)
246 class ThreadLimiter(object):
248 Limit the number of threads running at a given time to
249 {desired successes} minus {successes reported}. When successes
250 reported == desired, wake up the remaining threads and tell
253 Should be used in a "with" block.
255 def __init__(self, todo):
258 self._response = None
259 self._todo_lock = threading.Semaphore(todo)
260 self._done_lock = threading.Lock()
263 self._todo_lock.acquire()
266 def __exit__(self, type, value, traceback):
267 self._todo_lock.release()
269 def shall_i_proceed(self):
271 Return true if the current thread should do stuff. Return
272 false if the current thread should just stop.
274 with self._done_lock:
275 return (self._done < self._todo)
277 def save_response(self, response_body, replicas_stored):
279 Records a response body (a locator, possibly signed) returned by
280 the Keep server. It is not necessary to save more than
281 one response, since we presume that any locator returned
282 in response to a successful request is valid.
284 with self._done_lock:
285 self._done += replicas_stored
286 self._response = response_body
290 Returns the body from the response to a PUT request.
292 with self._done_lock:
293 return self._response
297 Return how many successes were reported.
299 with self._done_lock:
303 class KeepService(object):
304 # Make requests to a single Keep service, and track results.
305 HTTP_ERRORS = (requests.exceptions.RequestException,
306 socket.error, ssl.SSLError)
308 def __init__(self, root, session, **headers):
310 self.last_result = None
311 self.success_flag = None
312 self.session = session
313 self.get_headers = {'Accept': 'application/octet-stream'}
314 self.get_headers.update(headers)
315 self.put_headers = headers
318 return self.success_flag is not False
321 return self.success_flag is not None
323 def last_status(self):
325 return self.last_result.status_code
326 except AttributeError:
329 def get(self, locator, timeout=None):
330 # locator is a KeepLocator object.
331 url = self.root + str(locator)
332 _logger.debug("Request: GET %s", url)
334 with timer.Timer() as t:
335 result = self.session.get(url.encode('utf-8'),
336 headers=self.get_headers,
338 except self.HTTP_ERRORS as e:
339 _logger.debug("Request fail: GET %s => %s: %s",
340 url, type(e), str(e))
343 self.last_result = result
344 self.success_flag = retry.check_http_response_success(result)
345 content = result.content
346 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
347 self.last_status(), len(content), t.msecs,
348 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
349 if self.success_flag:
350 resp_md5 = hashlib.md5(content).hexdigest()
351 if resp_md5 == locator.md5sum:
353 _logger.warning("Checksum fail: md5(%s) = %s",
357 def put(self, hash_s, body, timeout=None):
358 url = self.root + hash_s
359 _logger.debug("Request: PUT %s", url)
361 result = self.session.put(url.encode('utf-8'),
363 headers=self.put_headers,
365 except self.HTTP_ERRORS as e:
366 _logger.debug("Request fail: PUT %s => %s: %s",
367 url, type(e), str(e))
370 self.last_result = result
371 self.success_flag = retry.check_http_response_success(result)
372 return self.success_flag
375 class KeepWriterThread(threading.Thread):
377 Write a blob of data to the given Keep server. On success, call
378 save_response() of the given ThreadLimiter to save the returned
381 def __init__(self, keep_service, **kwargs):
382 super(KeepClient.KeepWriterThread, self).__init__()
383 self.service = keep_service
385 self._success = False
391 with self.args['thread_limiter'] as limiter:
392 if not limiter.shall_i_proceed():
393 # My turn arrived, but the job has been done without
396 self.run_with_limiter(limiter)
398 def run_with_limiter(self, limiter):
399 if self.service.finished():
401 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
402 str(threading.current_thread()),
403 self.args['data_hash'],
404 len(self.args['data']),
405 self.args['service_root'])
406 self._success = bool(self.service.put(
407 self.args['data_hash'],
409 timeout=self.args.get('timeout', None)))
410 status = self.service.last_status()
412 result = self.service.last_result
413 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
414 str(threading.current_thread()),
415 self.args['data_hash'],
416 len(self.args['data']),
417 self.args['service_root'])
418 # Tick the 'done' counter for the number of replica
419 # reported stored by the server, for the case that
420 # we're talking to a proxy or other backend that
421 # stores to multiple copies for us.
423 replicas_stored = int(result.headers['x-keep-replicas-stored'])
424 except (KeyError, ValueError):
426 limiter.save_response(result.content.strip(), replicas_stored)
427 elif status is not None:
428 _logger.debug("Request fail: PUT %s => %s %s",
429 self.args['data_hash'], status,
430 self.service.last_result.content)
433 def __init__(self, api_client=None, proxy=None,
434 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
435 api_token=None, local_store=None, block_cache=None,
436 num_retries=0, session=None):
437 """Initialize a new KeepClient.
441 The API client to use to find Keep services. If not
442 provided, KeepClient will build one from available Arvados
446 If specified, this KeepClient will send requests to this Keep
447 proxy. Otherwise, KeepClient will fall back to the setting of the
448 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
449 KeepClient does not use a proxy, pass in an empty string.
452 The initial timeout (in seconds) for HTTP requests to Keep
453 non-proxy servers. A tuple of two floats is interpreted as
454 (connection_timeout, read_timeout): see
455 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
456 Because timeouts are often a result of transient server load, the
457 actual connection timeout will be increased by a factor of two on
462 The initial timeout (in seconds) for HTTP requests to
463 Keep proxies. A tuple of two floats is interpreted as
464 (connection_timeout, read_timeout). The behavior described
465 above for adjusting connection timeouts on retry also applies.
469 If you're not using an API client, but only talking
470 directly to a Keep proxy, this parameter specifies an API token
471 to authenticate Keep requests. It is an error to specify both
472 api_client and api_token. If you specify neither, KeepClient
473 will use one available from the Arvados configuration.
476 If specified, this KeepClient will bypass Keep
477 services, and save data to the named directory. If unspecified,
478 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
479 environment variable. If you want to ensure KeepClient does not
480 use local storage, pass in an empty string. This is primarily
481 intended to mock a server for testing.
484 The default number of times to retry failed requests.
485 This will be used as the default num_retries value when get() and
486 put() are called. Default 0.
489 The requests.Session object to use for get() and put() requests.
490 Will create one if not specified.
492 self.lock = threading.Lock()
494 proxy = config.get('ARVADOS_KEEP_PROXY')
495 if api_token is None:
496 if api_client is None:
497 api_token = config.get('ARVADOS_API_TOKEN')
499 api_token = api_client.api_token
500 elif api_client is not None:
502 "can't build KeepClient with both API client and token")
503 if local_store is None:
504 local_store = os.environ.get('KEEP_LOCAL_STORE')
506 self.block_cache = block_cache if block_cache else KeepBlockCache()
507 self.timeout = timeout
508 self.proxy_timeout = proxy_timeout
511 self.local_store = local_store
512 self.get = self.local_store_get
513 self.put = self.local_store_put
515 self.num_retries = num_retries
516 self.session = session if session is not None else requests.Session()
518 if not proxy.endswith('/'):
520 self.api_token = api_token
521 self._keep_services = [{
523 '_service_root': proxy,
525 self.using_proxy = True
526 self._static_services_list = True
528 # It's important to avoid instantiating an API client
529 # unless we actually need one, for testing's sake.
530 if api_client is None:
531 api_client = arvados.api('v1')
532 self.api_client = api_client
533 self.api_token = api_client.api_token
534 self._keep_services = None
535 self.using_proxy = None
536 self._static_services_list = False
538 def current_timeout(self, attempt_number):
539 """Return the appropriate timeout to use for this client.
541 The proxy timeout setting if the backend service is currently a proxy,
542 the regular timeout setting otherwise. The `attempt_number` indicates
543 how many times the operation has been tried already (starting from 0
544 for the first try), and scales the connection timeout portion of the
545 return value accordingly.
548 # TODO(twp): the timeout should be a property of a
549 # KeepService, not a KeepClient. See #4488.
550 t = self.proxy_timeout if self.using_proxy else self.timeout
551 return (t[0] * (1 << attempt_number), t[1])
553 def build_services_list(self, force_rebuild=False):
554 if (self._static_services_list or
555 (self._keep_services and not force_rebuild)):
559 keep_services = self.api_client.keep_services().accessible()
560 except Exception: # API server predates Keep services.
561 keep_services = self.api_client.keep_disks().list()
563 self._keep_services = keep_services.execute().get('items')
564 if not self._keep_services:
565 raise arvados.errors.NoKeepServersError()
567 self.using_proxy = any(ks.get('service_type') == 'proxy'
568 for ks in self._keep_services)
570 # Precompute the base URI for each service.
571 for r in self._keep_services:
572 r['_service_root'] = "{}://[{}]:{:d}/".format(
573 'https' if r['service_ssl_flag'] else 'http',
576 _logger.debug(str(self._keep_services))
578 def _service_weight(self, data_hash, service_uuid):
579 """Compute the weight of a Keep service endpoint for a data
580 block with a known hash.
582 The weight is md5(h + u) where u is the last 15 characters of
583 the service endpoint's UUID.
585 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
587 def weighted_service_roots(self, data_hash, force_rebuild=False):
588 """Return an array of Keep service endpoints, in the order in
589 which they should be probed when reading or writing data with
592 self.build_services_list(force_rebuild)
594 # Sort the available services by weight (heaviest first) for
595 # this data_hash, and return their service_roots (base URIs)
598 svc['_service_root'] for svc in sorted(
601 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
602 _logger.debug(data_hash + ': ' + str(sorted_roots))
605 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
606 # roots_map is a dictionary, mapping Keep service root strings
607 # to KeepService objects. Poll for Keep services, and add any
608 # new ones to roots_map. Return the current list of local
610 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
611 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
612 for root in local_roots:
613 if root not in roots_map:
614 roots_map[root] = self.KeepService(root, self.session, **headers)
618 def _check_loop_result(result):
619 # KeepClient RetryLoops should save results as a 2-tuple: the
620 # actual result of the request, and the number of servers available
621 # to receive the request this round.
622 # This method returns True if there's a real result, False if
623 # there are no more servers available, otherwise None.
624 if isinstance(result, Exception):
626 result, tried_server_count = result
627 if (result is not None) and (result is not False):
629 elif tried_server_count < 1:
630 _logger.info("No more Keep services to try; giving up")
635 def get_from_cache(self, loc):
636 """Fetch a block only if is in the cache, otherwise return None."""
637 slot = self.block_cache.get(loc)
638 if slot.ready.is_set():
644 def get(self, loc_s, num_retries=None):
645 """Get data from Keep.
647 This method fetches one or more blocks of data from Keep. It
648 sends a request each Keep service registered with the API
649 server (or the proxy provided when this client was
650 instantiated), then each service named in location hints, in
651 sequence. As soon as one service provides the data, it's
655 * loc_s: A string of one or more comma-separated locators to fetch.
656 This method returns the concatenation of these blocks.
657 * num_retries: The number of times to retry GET requests to
658 *each* Keep server if it returns temporary failures, with
659 exponential backoff. Note that, in each loop, the method may try
660 to fetch data from every available Keep service, along with any
661 that are named in location hints in the locator. The default value
662 is set when the KeepClient is initialized.
665 return ''.join(self.get(x) for x in loc_s.split(','))
666 locator = KeepLocator(loc_s)
667 expect_hash = locator.md5sum
668 slot, first = self.block_cache.reserve_cache(expect_hash)
673 # See #3147 for a discussion of the loop implementation. Highlights:
674 # * Refresh the list of Keep services after each failure, in case
675 # it's being updated.
676 # * Retry until we succeed, we're out of retries, or every available
677 # service has returned permanent failure.
678 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
679 for hint in locator.hints if hint.startswith('K@')]
680 # Map root URLs their KeepService objects.
681 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
683 loop = retry.RetryLoop(num_retries, self._check_loop_result,
685 for tries_left in loop:
687 local_roots = self.map_new_services(
688 roots_map, expect_hash,
689 force_rebuild=(tries_left < num_retries))
690 except Exception as error:
691 loop.save_result(error)
694 # Query KeepService objects that haven't returned
695 # permanent failure, in our specified shuffle order.
696 services_to_try = [roots_map[root]
697 for root in (local_roots + hint_roots)
698 if roots_map[root].usable()]
699 for keep_service in services_to_try:
700 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
703 loop.save_result((blob, len(services_to_try)))
705 # Always cache the result, then return it if we succeeded.
707 self.block_cache.cap_cache()
712 all_roots = local_roots + hint_roots
714 # We never successfully fetched local_roots.
715 all_roots = hint_roots
716 # Q: Including 403 is necessary for the Keep tests to continue
717 # passing, but maybe they should expect KeepReadError instead?
718 not_founds = sum(1 for key in all_roots
719 if roots_map[key].last_status() in {403, 404, 410})
720 service_errors = ((key, roots_map[key].last_result)
721 for key in all_roots)
723 raise arvados.errors.KeepReadError(
724 "failed to read {}: no Keep services available ({})".format(
725 loc_s, loop.last_result()))
726 elif not_founds == len(all_roots):
727 raise arvados.errors.NotFoundError(
728 "{} not found".format(loc_s), service_errors)
730 raise arvados.errors.KeepReadError(
731 "failed to read {}".format(loc_s), service_errors, label="service")
734 def put(self, data, copies=2, num_retries=None):
735 """Save data in Keep.
737 This method will get a list of Keep services from the API server, and
738 send the data to each one simultaneously in a new thread. Once the
739 uploads are finished, if enough copies are saved, this method returns
740 the most recent HTTP response body. If requests fail to upload
741 enough copies, this method raises KeepWriteError.
744 * data: The string of data to upload.
745 * copies: The number of copies that the user requires be saved.
747 * num_retries: The number of times to retry PUT requests to
748 *each* Keep server if it returns temporary failures, with
749 exponential backoff. The default value is set when the
750 KeepClient is initialized.
753 if isinstance(data, unicode):
754 data = data.encode("ascii")
755 elif not isinstance(data, str):
756 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
758 data_hash = hashlib.md5(data).hexdigest()
764 # Tell the proxy how many copies we want it to store
765 headers['X-Keep-Desired-Replication'] = str(copies)
767 thread_limiter = KeepClient.ThreadLimiter(copies)
768 loop = retry.RetryLoop(num_retries, self._check_loop_result,
770 for tries_left in loop:
772 local_roots = self.map_new_services(
773 roots_map, data_hash,
774 force_rebuild=(tries_left < num_retries), **headers)
775 except Exception as error:
776 loop.save_result(error)
780 for service_root, ks in roots_map.iteritems():
783 t = KeepClient.KeepWriterThread(
787 service_root=service_root,
788 thread_limiter=thread_limiter,
789 timeout=self.current_timeout(num_retries-tries_left))
794 loop.save_result((thread_limiter.done() >= copies, len(threads)))
797 return thread_limiter.response()
799 raise arvados.errors.KeepWriteError(
800 "failed to write {}: no Keep services available ({})".format(
801 data_hash, loop.last_result()))
803 service_errors = ((key, roots_map[key].last_result)
804 for key in local_roots
805 if not roots_map[key].success_flag)
806 raise arvados.errors.KeepWriteError(
807 "failed to write {} (wanted {} copies but wrote {})".format(
808 data_hash, copies, thread_limiter.done()), service_errors, label="service")
810 def local_store_put(self, data, copies=1, num_retries=None):
813 This method is used in place of the real put() method when
814 using local storage (see constructor's local_store argument).
816 copies and num_retries arguments are ignored: they are here
817 only for the sake of offering the same call signature as
820 Data stored this way can be retrieved via local_store_get().
822 md5 = hashlib.md5(data).hexdigest()
823 locator = '%s+%d' % (md5, len(data))
824 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
826 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
827 os.path.join(self.local_store, md5))
830 def local_store_get(self, loc_s, num_retries=None):
831 """Companion to local_store_put()."""
833 locator = KeepLocator(loc_s)
835 raise arvados.errors.NotFoundError(
836 "Invalid data locator: '%s'" % loc_s)
837 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
839 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
842 def is_cached(self, locator):
843 return self.block_cache.reserve_cache(expect_hash)