25 import arvados.config as config
27 import arvados.retry as retry
31 # Workaround for urllib3 bug.
32 # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
33 # However, urllib3 prior to version 1.10 has a major bug in this feature
34 # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
35 # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
36 # following workaround is necessary to be able to use
37 # the arvados python sdk with the distribution-provided packages.
39 from pkg_resources import parse_version
40 if parse_version(urllib3.__version__) < parse_version('1.10'):
41 from urllib3.contrib import pyopenssl
42 pyopenssl.extract_from_urllib3()
46 _logger = logging.getLogger('arvados.keep')
47 global_client_object = None
49 class KeepLocator(object):
50 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
51 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53 def __init__(self, locator_str):
56 self._perm_expiry = None
57 pieces = iter(locator_str.split('+'))
58 self.md5sum = next(pieces)
60 self.size = int(next(pieces))
64 if self.HINT_RE.match(hint) is None:
65 raise ValueError("unrecognized hint data {}".format(hint))
66 elif hint.startswith('A'):
67 self.parse_permission_hint(hint)
69 self.hints.append(hint)
73 str(s) for s in [self.md5sum, self.size,
74 self.permission_hint()] + self.hints
78 if self.size is not None:
79 return "%s+%i" % (self.md5sum, self.size)
83 def _make_hex_prop(name, length):
84 # Build and return a new property with the given name that
85 # must be a hex string of the given length.
86 data_name = '_{}'.format(name)
88 return getattr(self, data_name)
89 def setter(self, hex_str):
90 if not arvados.util.is_hex(hex_str, length):
91 raise ValueError("{} must be a {}-digit hex string: {}".
92 format(name, length, hex_str))
93 setattr(self, data_name, hex_str)
94 return property(getter, setter)
96 md5sum = _make_hex_prop('md5sum', 32)
97 perm_sig = _make_hex_prop('perm_sig', 40)
100 def perm_expiry(self):
101 return self._perm_expiry
104 def perm_expiry(self, value):
105 if not arvados.util.is_hex(value, 1, 8):
107 "permission timestamp must be a hex Unix timestamp: {}".
109 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
111 def permission_hint(self):
112 data = [self.perm_sig, self.perm_expiry]
115 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
116 return "A{}@{:08x}".format(*data)
118 def parse_permission_hint(self, s):
120 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
122 raise ValueError("bad permission hint {}".format(s))
124 def permission_expired(self, as_of_dt=None):
125 if self.perm_expiry is None:
127 elif as_of_dt is None:
128 as_of_dt = datetime.datetime.now()
129 return self.perm_expiry <= as_of_dt
133 """Simple interface to a global KeepClient object.
135 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
136 own API client. The global KeepClient will build an API client from the
137 current Arvados configuration, which may not match the one you built.
142 def global_client_object(cls):
143 global global_client_object
144 # Previously, KeepClient would change its behavior at runtime based
145 # on these configuration settings. We simulate that behavior here
146 # by checking the values and returning a new KeepClient if any of
148 key = (config.get('ARVADOS_API_HOST'),
149 config.get('ARVADOS_API_TOKEN'),
150 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
151 config.get('ARVADOS_KEEP_PROXY'),
152 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
153 os.environ.get('KEEP_LOCAL_STORE'))
154 if (global_client_object is None) or (cls._last_key != key):
155 global_client_object = KeepClient()
157 return global_client_object
160 def get(locator, **kwargs):
161 return Keep.global_client_object().get(locator, **kwargs)
164 def put(data, **kwargs):
165 return Keep.global_client_object().put(data, **kwargs)
167 class KeepBlockCache(object):
168 # Default RAM cache is 256MiB
169 def __init__(self, cache_max=(256 * 1024 * 1024)):
170 self.cache_max = cache_max
172 self._cache_lock = threading.Lock()
174 class CacheSlot(object):
175 def __init__(self, locator):
176 self.locator = locator
177 self.ready = threading.Event()
184 def set(self, value):
189 if self.content is None:
192 return len(self.content)
195 '''Cap the cache size to self.cache_max'''
196 with self._cache_lock:
197 # Select all slots except those where ready.is_set() and content is
198 # None (that means there was an error reading the block).
199 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
200 sm = sum([slot.size() for slot in self._cache])
201 while len(self._cache) > 0 and sm > self.cache_max:
202 for i in xrange(len(self._cache)-1, -1, -1):
203 if self._cache[i].ready.is_set():
206 sm = sum([slot.size() for slot in self._cache])
208 def _get(self, locator):
209 # Test if the locator is already in the cache
210 for i in xrange(0, len(self._cache)):
211 if self._cache[i].locator == locator:
214 # move it to the front
216 self._cache.insert(0, n)
220 def get(self, locator):
221 with self._cache_lock:
222 return self._get(locator)
224 def reserve_cache(self, locator):
225 '''Reserve a cache slot for the specified locator,
226 or return the existing slot.'''
227 with self._cache_lock:
228 n = self._get(locator)
232 # Add a new cache slot for the locator
233 n = KeepBlockCache.CacheSlot(locator)
234 self._cache.insert(0, n)
237 class KeepClient(object):
239 # Default Keep server connection timeout: 2 seconds
240 # Default Keep server read timeout: 300 seconds
241 # Default Keep proxy connection timeout: 20 seconds
242 # Default Keep proxy read timeout: 300 seconds
243 DEFAULT_TIMEOUT = (2, 300)
244 DEFAULT_PROXY_TIMEOUT = (20, 300)
246 class ThreadLimiter(object):
248 Limit the number of threads running at a given time to
249 {desired successes} minus {successes reported}. When successes
250 reported == desired, wake up the remaining threads and tell
253 Should be used in a "with" block.
255 def __init__(self, todo):
258 self._response = None
259 self._todo_lock = threading.Semaphore(todo)
260 self._done_lock = threading.Lock()
263 self._todo_lock.acquire()
266 def __exit__(self, type, value, traceback):
267 self._todo_lock.release()
269 def shall_i_proceed(self):
271 Return true if the current thread should do stuff. Return
272 false if the current thread should just stop.
274 with self._done_lock:
275 return (self._done < self._todo)
277 def save_response(self, response_body, replicas_stored):
279 Records a response body (a locator, possibly signed) returned by
280 the Keep server. It is not necessary to save more than
281 one response, since we presume that any locator returned
282 in response to a successful request is valid.
284 with self._done_lock:
285 self._done += replicas_stored
286 self._response = response_body
290 Returns the body from the response to a PUT request.
292 with self._done_lock:
293 return self._response
297 Return how many successes were reported.
299 with self._done_lock:
303 class KeepService(object):
304 # Make requests to a single Keep service, and track results.
305 HTTP_ERRORS = (requests.exceptions.RequestException,
306 socket.error, ssl.SSLError)
308 def __init__(self, root, session, **headers):
310 self.last_result = None
311 self.success_flag = None
312 self.session = session
313 self.get_headers = {'Accept': 'application/octet-stream'}
314 self.get_headers.update(headers)
315 self.put_headers = headers
318 return self.success_flag is not False
321 return self.success_flag is not None
323 def last_status(self):
325 return self.last_result.status_code
326 except AttributeError:
329 def get(self, locator, timeout=None):
330 # locator is a KeepLocator object.
331 url = self.root + str(locator)
332 _logger.debug("Request: GET %s", url)
334 with timer.Timer() as t:
335 result = self.session.get(url.encode('utf-8'),
336 headers=self.get_headers,
338 except self.HTTP_ERRORS as e:
339 _logger.debug("Request fail: GET %s => %s: %s",
340 url, type(e), str(e))
343 self.last_result = result
344 self.success_flag = retry.check_http_response_success(result)
345 content = result.content
346 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
347 self.last_status(), len(content), t.msecs,
348 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
349 if self.success_flag:
350 resp_md5 = hashlib.md5(content).hexdigest()
351 if resp_md5 == locator.md5sum:
353 _logger.warning("Checksum fail: md5(%s) = %s",
357 def put(self, hash_s, body, timeout=None):
358 url = self.root + hash_s
359 _logger.debug("Request: PUT %s", url)
361 result = self.session.put(url.encode('utf-8'),
363 headers=self.put_headers,
365 except self.HTTP_ERRORS as e:
366 _logger.debug("Request fail: PUT %s => %s: %s",
367 url, type(e), str(e))
370 self.last_result = result
371 self.success_flag = retry.check_http_response_success(result)
372 return self.success_flag
375 class KeepWriterThread(threading.Thread):
377 Write a blob of data to the given Keep server. On success, call
378 save_response() of the given ThreadLimiter to save the returned
381 def __init__(self, keep_service, **kwargs):
382 super(KeepClient.KeepWriterThread, self).__init__()
383 self.service = keep_service
385 self._success = False
391 with self.args['thread_limiter'] as limiter:
392 if not limiter.shall_i_proceed():
393 # My turn arrived, but the job has been done without
396 self.run_with_limiter(limiter)
398 def run_with_limiter(self, limiter):
399 if self.service.finished():
401 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
402 str(threading.current_thread()),
403 self.args['data_hash'],
404 len(self.args['data']),
405 self.args['service_root'])
406 self._success = bool(self.service.put(
407 self.args['data_hash'],
409 timeout=self.args.get('timeout', None)))
410 status = self.service.last_status()
412 result = self.service.last_result
413 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
414 str(threading.current_thread()),
415 self.args['data_hash'],
416 len(self.args['data']),
417 self.args['service_root'])
418 # Tick the 'done' counter for the number of replica
419 # reported stored by the server, for the case that
420 # we're talking to a proxy or other backend that
421 # stores to multiple copies for us.
423 replicas_stored = int(result.headers['x-keep-replicas-stored'])
424 except (KeyError, ValueError):
426 limiter.save_response(result.content.strip(), replicas_stored)
427 elif status is not None:
428 _logger.debug("Request fail: PUT %s => %s %s",
429 self.args['data_hash'], status,
430 self.service.last_result.content)
433 def __init__(self, api_client=None, proxy=None,
434 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
435 api_token=None, local_store=None, block_cache=None,
436 num_retries=0, session=None):
437 """Initialize a new KeepClient.
441 The API client to use to find Keep services. If not
442 provided, KeepClient will build one from available Arvados
446 If specified, this KeepClient will send requests to this Keep
447 proxy. Otherwise, KeepClient will fall back to the setting of the
448 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
449 KeepClient does not use a proxy, pass in an empty string.
452 The timeout (in seconds) for HTTP requests to Keep
453 non-proxy servers. A tuple of two floats is interpreted as
454 (connection_timeout, read_timeout): see
455 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
459 The timeout (in seconds) for HTTP requests to
460 Keep proxies. A tuple of two floats is interpreted as
461 (connection_timeout, read_timeout). Default: (20, 300).
464 If you're not using an API client, but only talking
465 directly to a Keep proxy, this parameter specifies an API token
466 to authenticate Keep requests. It is an error to specify both
467 api_client and api_token. If you specify neither, KeepClient
468 will use one available from the Arvados configuration.
471 If specified, this KeepClient will bypass Keep
472 services, and save data to the named directory. If unspecified,
473 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
474 environment variable. If you want to ensure KeepClient does not
475 use local storage, pass in an empty string. This is primarily
476 intended to mock a server for testing.
479 The default number of times to retry failed requests.
480 This will be used as the default num_retries value when get() and
481 put() are called. Default 0.
484 The requests.Session object to use for get() and put() requests.
485 Will create one if not specified.
487 self.lock = threading.Lock()
489 proxy = config.get('ARVADOS_KEEP_PROXY')
490 if api_token is None:
491 if api_client is None:
492 api_token = config.get('ARVADOS_API_TOKEN')
494 api_token = api_client.api_token
495 elif api_client is not None:
497 "can't build KeepClient with both API client and token")
498 if local_store is None:
499 local_store = os.environ.get('KEEP_LOCAL_STORE')
501 self.block_cache = block_cache if block_cache else KeepBlockCache()
502 self.timeout = timeout
503 self.proxy_timeout = proxy_timeout
506 self.local_store = local_store
507 self.get = self.local_store_get
508 self.put = self.local_store_put
510 self.num_retries = num_retries
511 self.session = session if session is not None else requests.Session()
513 if not proxy.endswith('/'):
515 self.api_token = api_token
516 self._keep_services = [{
518 '_service_root': proxy,
520 self.using_proxy = True
521 self._static_services_list = True
523 # It's important to avoid instantiating an API client
524 # unless we actually need one, for testing's sake.
525 if api_client is None:
526 api_client = arvados.api('v1')
527 self.api_client = api_client
528 self.api_token = api_client.api_token
529 self._keep_services = None
530 self.using_proxy = None
531 self._static_services_list = False
533 def current_timeout(self, attempt_number):
534 """Return the appropriate timeout to use for this client.
536 The proxy timeout setting if the backend service is currently a proxy,
537 the regular timeout setting otherwise. The `attempt_number` indicates
538 how many times the operation has been tried already (starting from 0
539 for the first try), and scales the connection timeout portion of the
540 return value accordingly.
543 # TODO(twp): the timeout should be a property of a
544 # KeepService, not a KeepClient. See #4488.
545 t = self.proxy_timeout if self.using_proxy else self.timeout
546 return (t[0] * (1 << attempt_number), t[1])
548 def build_services_list(self, force_rebuild=False):
549 if (self._static_services_list or
550 (self._keep_services and not force_rebuild)):
554 keep_services = self.api_client.keep_services().accessible()
555 except Exception: # API server predates Keep services.
556 keep_services = self.api_client.keep_disks().list()
558 self._keep_services = keep_services.execute().get('items')
559 if not self._keep_services:
560 raise arvados.errors.NoKeepServersError()
562 self.using_proxy = any(ks.get('service_type') == 'proxy'
563 for ks in self._keep_services)
565 # Precompute the base URI for each service.
566 for r in self._keep_services:
567 r['_service_root'] = "{}://[{}]:{:d}/".format(
568 'https' if r['service_ssl_flag'] else 'http',
571 _logger.debug(str(self._keep_services))
573 def _service_weight(self, data_hash, service_uuid):
574 """Compute the weight of a Keep service endpoint for a data
575 block with a known hash.
577 The weight is md5(h + u) where u is the last 15 characters of
578 the service endpoint's UUID.
580 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
582 def weighted_service_roots(self, data_hash, force_rebuild=False):
583 """Return an array of Keep service endpoints, in the order in
584 which they should be probed when reading or writing data with
587 self.build_services_list(force_rebuild)
589 # Sort the available services by weight (heaviest first) for
590 # this data_hash, and return their service_roots (base URIs)
593 svc['_service_root'] for svc in sorted(
596 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
597 _logger.debug(data_hash + ': ' + str(sorted_roots))
600 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
601 # roots_map is a dictionary, mapping Keep service root strings
602 # to KeepService objects. Poll for Keep services, and add any
603 # new ones to roots_map. Return the current list of local
605 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
606 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
607 for root in local_roots:
608 if root not in roots_map:
609 roots_map[root] = self.KeepService(root, self.session, **headers)
613 def _check_loop_result(result):
614 # KeepClient RetryLoops should save results as a 2-tuple: the
615 # actual result of the request, and the number of servers available
616 # to receive the request this round.
617 # This method returns True if there's a real result, False if
618 # there are no more servers available, otherwise None.
619 if isinstance(result, Exception):
621 result, tried_server_count = result
622 if (result is not None) and (result is not False):
624 elif tried_server_count < 1:
625 _logger.info("No more Keep services to try; giving up")
630 def get_from_cache(self, loc):
631 """Fetch a block only if is in the cache, otherwise return None."""
632 slot = self.block_cache.get(loc)
633 if slot.ready.is_set():
639 def get(self, loc_s, num_retries=None):
640 """Get data from Keep.
642 This method fetches one or more blocks of data from Keep. It
643 sends a request each Keep service registered with the API
644 server (or the proxy provided when this client was
645 instantiated), then each service named in location hints, in
646 sequence. As soon as one service provides the data, it's
650 * loc_s: A string of one or more comma-separated locators to fetch.
651 This method returns the concatenation of these blocks.
652 * num_retries: The number of times to retry GET requests to
653 *each* Keep server if it returns temporary failures, with
654 exponential backoff. Note that, in each loop, the method may try
655 to fetch data from every available Keep service, along with any
656 that are named in location hints in the locator. The default value
657 is set when the KeepClient is initialized.
660 return ''.join(self.get(x) for x in loc_s.split(','))
661 locator = KeepLocator(loc_s)
662 expect_hash = locator.md5sum
663 slot, first = self.block_cache.reserve_cache(expect_hash)
668 # See #3147 for a discussion of the loop implementation. Highlights:
669 # * Refresh the list of Keep services after each failure, in case
670 # it's being updated.
671 # * Retry until we succeed, we're out of retries, or every available
672 # service has returned permanent failure.
673 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
674 for hint in locator.hints if hint.startswith('K@')]
675 # Map root URLs their KeepService objects.
676 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
678 loop = retry.RetryLoop(num_retries, self._check_loop_result,
680 for tries_left in loop:
682 local_roots = self.map_new_services(
683 roots_map, expect_hash,
684 force_rebuild=(tries_left < num_retries))
685 except Exception as error:
686 loop.save_result(error)
689 # Query KeepService objects that haven't returned
690 # permanent failure, in our specified shuffle order.
691 services_to_try = [roots_map[root]
692 for root in (local_roots + hint_roots)
693 if roots_map[root].usable()]
694 for keep_service in services_to_try:
695 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
698 loop.save_result((blob, len(services_to_try)))
700 # Always cache the result, then return it if we succeeded.
702 self.block_cache.cap_cache()
707 all_roots = local_roots + hint_roots
709 # We never successfully fetched local_roots.
710 all_roots = hint_roots
711 # Q: Including 403 is necessary for the Keep tests to continue
712 # passing, but maybe they should expect KeepReadError instead?
713 not_founds = sum(1 for key in all_roots
714 if roots_map[key].last_status() in {403, 404, 410})
715 service_errors = ((key, roots_map[key].last_result)
716 for key in all_roots)
718 raise arvados.errors.KeepReadError(
719 "failed to read {}: no Keep services available ({})".format(
720 loc_s, loop.last_result()))
721 elif not_founds == len(all_roots):
722 raise arvados.errors.NotFoundError(
723 "{} not found".format(loc_s), service_errors)
725 raise arvados.errors.KeepReadError(
726 "failed to read {}".format(loc_s), service_errors, label="service")
729 def put(self, data, copies=2, num_retries=None):
730 """Save data in Keep.
732 This method will get a list of Keep services from the API server, and
733 send the data to each one simultaneously in a new thread. Once the
734 uploads are finished, if enough copies are saved, this method returns
735 the most recent HTTP response body. If requests fail to upload
736 enough copies, this method raises KeepWriteError.
739 * data: The string of data to upload.
740 * copies: The number of copies that the user requires be saved.
742 * num_retries: The number of times to retry PUT requests to
743 *each* Keep server if it returns temporary failures, with
744 exponential backoff. The default value is set when the
745 KeepClient is initialized.
748 if isinstance(data, unicode):
749 data = data.encode("ascii")
750 elif not isinstance(data, str):
751 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
753 data_hash = hashlib.md5(data).hexdigest()
759 # Tell the proxy how many copies we want it to store
760 headers['X-Keep-Desired-Replication'] = str(copies)
762 thread_limiter = KeepClient.ThreadLimiter(copies)
763 loop = retry.RetryLoop(num_retries, self._check_loop_result,
765 for tries_left in loop:
767 local_roots = self.map_new_services(
768 roots_map, data_hash,
769 force_rebuild=(tries_left < num_retries), **headers)
770 except Exception as error:
771 loop.save_result(error)
775 for service_root, ks in roots_map.iteritems():
778 t = KeepClient.KeepWriterThread(
782 service_root=service_root,
783 thread_limiter=thread_limiter,
784 timeout=self.current_timeout(num_retries-tries_left))
789 loop.save_result((thread_limiter.done() >= copies, len(threads)))
792 return thread_limiter.response()
794 raise arvados.errors.KeepWriteError(
795 "failed to write {}: no Keep services available ({})".format(
796 data_hash, loop.last_result()))
798 service_errors = ((key, roots_map[key].last_result)
799 for key in local_roots
800 if not roots_map[key].success_flag)
801 raise arvados.errors.KeepWriteError(
802 "failed to write {} (wanted {} copies but wrote {})".format(
803 data_hash, copies, thread_limiter.done()), service_errors, label="service")
805 def local_store_put(self, data, copies=1, num_retries=None):
808 This method is used in place of the real put() method when
809 using local storage (see constructor's local_store argument).
811 copies and num_retries arguments are ignored: they are here
812 only for the sake of offering the same call signature as
815 Data stored this way can be retrieved via local_store_get().
817 md5 = hashlib.md5(data).hexdigest()
818 locator = '%s+%d' % (md5, len(data))
819 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
821 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
822 os.path.join(self.local_store, md5))
825 def local_store_get(self, loc_s, num_retries=None):
826 """Companion to local_store_put()."""
828 locator = KeepLocator(loc_s)
830 raise arvados.errors.NotFoundError(
831 "Invalid data locator: '%s'" % loc_s)
832 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
834 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
837 def is_cached(self, locator):
838 return self.block_cache.reserve_cache(expect_hash)