25 import arvados.config as config
27 import arvados.retry as retry
31 # Workaround for urllib3 bug.
32 # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
33 # However, urllib3 prior to version 1.10 has a major bug in this feature
34 # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
35 # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
36 # following workaround is necessary to be able to use
37 # the arvados python sdk with the distribution-provided packages.
39 from pkg_resources import parse_version
40 if parse_version(urllib3.__version__) < parse_version('1.10'):
41 from urllib3.contrib import pyopenssl
42 pyopenssl.extract_from_urllib3()
46 _logger = logging.getLogger('arvados.keep')
47 global_client_object = None
49 class KeepLocator(object):
50 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
51 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
53 def __init__(self, locator_str):
56 self._perm_expiry = None
57 pieces = iter(locator_str.split('+'))
58 self.md5sum = next(pieces)
60 self.size = int(next(pieces))
64 if self.HINT_RE.match(hint) is None:
65 raise ValueError("unrecognized hint data {}".format(hint))
66 elif hint.startswith('A'):
67 self.parse_permission_hint(hint)
69 self.hints.append(hint)
73 str(s) for s in [self.md5sum, self.size,
74 self.permission_hint()] + self.hints
78 if self.size is not None:
79 return "%s+%i" % (self.md5sum, self.size)
83 def _make_hex_prop(name, length):
84 # Build and return a new property with the given name that
85 # must be a hex string of the given length.
86 data_name = '_{}'.format(name)
88 return getattr(self, data_name)
89 def setter(self, hex_str):
90 if not arvados.util.is_hex(hex_str, length):
91 raise ValueError("{} must be a {}-digit hex string: {}".
92 format(name, length, hex_str))
93 setattr(self, data_name, hex_str)
94 return property(getter, setter)
96 md5sum = _make_hex_prop('md5sum', 32)
97 perm_sig = _make_hex_prop('perm_sig', 40)
100 def perm_expiry(self):
101 return self._perm_expiry
104 def perm_expiry(self, value):
105 if not arvados.util.is_hex(value, 1, 8):
107 "permission timestamp must be a hex Unix timestamp: {}".
109 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
111 def permission_hint(self):
112 data = [self.perm_sig, self.perm_expiry]
115 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
116 return "A{}@{:08x}".format(*data)
118 def parse_permission_hint(self, s):
120 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
122 raise ValueError("bad permission hint {}".format(s))
124 def permission_expired(self, as_of_dt=None):
125 if self.perm_expiry is None:
127 elif as_of_dt is None:
128 as_of_dt = datetime.datetime.now()
129 return self.perm_expiry <= as_of_dt
133 """Simple interface to a global KeepClient object.
135 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
136 own API client. The global KeepClient will build an API client from the
137 current Arvados configuration, which may not match the one you built.
142 def global_client_object(cls):
143 global global_client_object
144 # Previously, KeepClient would change its behavior at runtime based
145 # on these configuration settings. We simulate that behavior here
146 # by checking the values and returning a new KeepClient if any of
148 key = (config.get('ARVADOS_API_HOST'),
149 config.get('ARVADOS_API_TOKEN'),
150 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
151 config.get('ARVADOS_KEEP_PROXY'),
152 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
153 os.environ.get('KEEP_LOCAL_STORE'))
154 if (global_client_object is None) or (cls._last_key != key):
155 global_client_object = KeepClient()
157 return global_client_object
160 def get(locator, **kwargs):
161 return Keep.global_client_object().get(locator, **kwargs)
164 def put(data, **kwargs):
165 return Keep.global_client_object().put(data, **kwargs)
167 class KeepBlockCache(object):
168 # Default RAM cache is 256MiB
169 def __init__(self, cache_max=(256 * 1024 * 1024)):
170 self.cache_max = cache_max
172 self._cache_lock = threading.Lock()
174 class CacheSlot(object):
175 def __init__(self, locator):
176 self.locator = locator
177 self.ready = threading.Event()
184 def set(self, value):
189 if self.content is None:
192 return len(self.content)
195 '''Cap the cache size to self.cache_max'''
196 with self._cache_lock:
197 # Select all slots except those where ready.is_set() and content is
198 # None (that means there was an error reading the block).
199 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
200 sm = sum([slot.size() for slot in self._cache])
201 while len(self._cache) > 0 and sm > self.cache_max:
202 for i in xrange(len(self._cache)-1, -1, -1):
203 if self._cache[i].ready.is_set():
206 sm = sum([slot.size() for slot in self._cache])
208 def _get(self, locator):
209 # Test if the locator is already in the cache
210 for i in xrange(0, len(self._cache)):
211 if self._cache[i].locator == locator:
214 # move it to the front
216 self._cache.insert(0, n)
220 def get(self, locator):
221 with self._cache_lock:
222 return self._get(locator)
224 def reserve_cache(self, locator):
225 '''Reserve a cache slot for the specified locator,
226 or return the existing slot.'''
227 with self._cache_lock:
228 n = self._get(locator)
232 # Add a new cache slot for the locator
233 n = KeepBlockCache.CacheSlot(locator)
234 self._cache.insert(0, n)
237 class KeepClient(object):
239 # Default Keep server connection timeout: 2 seconds
240 # Default Keep server read timeout: 300 seconds
241 # Default Keep proxy connection timeout: 20 seconds
242 # Default Keep proxy read timeout: 300 seconds
243 DEFAULT_TIMEOUT = (2, 300)
244 DEFAULT_PROXY_TIMEOUT = (20, 300)
246 class ThreadLimiter(object):
248 Limit the number of threads running at a given time to
249 {desired successes} minus {successes reported}. When successes
250 reported == desired, wake up the remaining threads and tell
253 Should be used in a "with" block.
255 def __init__(self, todo):
258 self._response = None
259 self._todo_lock = threading.Semaphore(todo)
260 self._done_lock = threading.Lock()
263 self._todo_lock.acquire()
266 def __exit__(self, type, value, traceback):
267 self._todo_lock.release()
269 def shall_i_proceed(self):
271 Return true if the current thread should do stuff. Return
272 false if the current thread should just stop.
274 with self._done_lock:
275 return (self._done < self._todo)
277 def save_response(self, response_body, replicas_stored):
279 Records a response body (a locator, possibly signed) returned by
280 the Keep server. It is not necessary to save more than
281 one response, since we presume that any locator returned
282 in response to a successful request is valid.
284 with self._done_lock:
285 self._done += replicas_stored
286 self._response = response_body
290 Returns the body from the response to a PUT request.
292 with self._done_lock:
293 return self._response
297 Return how many successes were reported.
299 with self._done_lock:
303 class KeepService(object):
304 # Make requests to a single Keep service, and track results.
305 HTTP_ERRORS = (requests.exceptions.RequestException,
306 socket.error, ssl.SSLError)
308 def __init__(self, root, session, **headers):
310 self.last_result = None
311 self.success_flag = None
312 self.session = session
313 self.get_headers = {'Accept': 'application/octet-stream'}
314 self.get_headers.update(headers)
315 self.put_headers = headers
318 return self.success_flag is not False
321 return self.success_flag is not None
323 def last_status(self):
325 return self.last_result.status_code
326 except AttributeError:
329 def get(self, locator, timeout=None):
330 # locator is a KeepLocator object.
331 url = self.root + str(locator)
332 _logger.debug("Request: GET %s", url)
334 with timer.Timer() as t:
335 result = self.session.get(url.encode('utf-8'),
336 headers=self.get_headers,
338 except self.HTTP_ERRORS as e:
339 _logger.debug("Request fail: GET %s => %s: %s",
340 url, type(e), str(e))
343 self.last_result = result
344 self.success_flag = retry.check_http_response_success(result)
345 content = result.content
346 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
347 self.last_status(), len(content), t.msecs,
348 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
349 if self.success_flag:
350 resp_md5 = hashlib.md5(content).hexdigest()
351 if resp_md5 == locator.md5sum:
353 _logger.warning("Checksum fail: md5(%s) = %s",
357 def put(self, hash_s, body, timeout=None):
358 url = self.root + hash_s
359 _logger.debug("Request: PUT %s", url)
361 result = self.session.put(url.encode('utf-8'),
363 headers=self.put_headers,
365 except self.HTTP_ERRORS as e:
366 _logger.debug("Request fail: PUT %s => %s: %s",
367 url, type(e), str(e))
370 self.last_result = result
371 self.success_flag = retry.check_http_response_success(result)
372 return self.success_flag
375 class KeepWriterThread(threading.Thread):
377 Write a blob of data to the given Keep server. On success, call
378 save_response() of the given ThreadLimiter to save the returned
381 def __init__(self, keep_service, **kwargs):
382 super(KeepClient.KeepWriterThread, self).__init__()
383 self.service = keep_service
385 self._success = False
391 with self.args['thread_limiter'] as limiter:
392 if not limiter.shall_i_proceed():
393 # My turn arrived, but the job has been done without
396 self.run_with_limiter(limiter)
398 def run_with_limiter(self, limiter):
399 if self.service.finished():
401 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
402 str(threading.current_thread()),
403 self.args['data_hash'],
404 len(self.args['data']),
405 self.args['service_root'])
406 self._success = bool(self.service.put(
407 self.args['data_hash'],
409 timeout=self.args.get('timeout', None)))
410 status = self.service.last_status()
412 result = self.service.last_result
413 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
414 str(threading.current_thread()),
415 self.args['data_hash'],
416 len(self.args['data']),
417 self.args['service_root'])
418 # Tick the 'done' counter for the number of replica
419 # reported stored by the server, for the case that
420 # we're talking to a proxy or other backend that
421 # stores to multiple copies for us.
423 replicas_stored = int(result.headers['x-keep-replicas-stored'])
424 except (KeyError, ValueError):
426 limiter.save_response(result.content.strip(), replicas_stored)
427 elif status is not None:
428 _logger.debug("Request fail: PUT %s => %s %s",
429 self.args['data_hash'], status,
430 self.service.last_result.content)
433 def __init__(self, api_client=None, proxy=None,
434 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
435 api_token=None, local_store=None, block_cache=None,
436 num_retries=0, session=None):
437 """Initialize a new KeepClient.
441 The API client to use to find Keep services. If not
442 provided, KeepClient will build one from available Arvados
446 If specified, this KeepClient will send requests to this Keep
447 proxy. Otherwise, KeepClient will fall back to the setting of the
448 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
449 KeepClient does not use a proxy, pass in an empty string.
452 The timeout (in seconds) for HTTP requests to Keep
453 non-proxy servers. A tuple of two floats is interpreted as
454 (connection_timeout, read_timeout): see
455 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
459 The timeout (in seconds) for HTTP requests to
460 Keep proxies. A tuple of two floats is interpreted as
461 (connection_timeout, read_timeout). Default: (20, 300).
464 If you're not using an API client, but only talking
465 directly to a Keep proxy, this parameter specifies an API token
466 to authenticate Keep requests. It is an error to specify both
467 api_client and api_token. If you specify neither, KeepClient
468 will use one available from the Arvados configuration.
471 If specified, this KeepClient will bypass Keep
472 services, and save data to the named directory. If unspecified,
473 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
474 environment variable. If you want to ensure KeepClient does not
475 use local storage, pass in an empty string. This is primarily
476 intended to mock a server for testing.
479 The default number of times to retry failed requests.
480 This will be used as the default num_retries value when get() and
481 put() are called. Default 0.
484 The requests.Session object to use for get() and put() requests.
485 Will create one if not specified.
487 self.lock = threading.Lock()
489 proxy = config.get('ARVADOS_KEEP_PROXY')
490 if api_token is None:
491 if api_client is None:
492 api_token = config.get('ARVADOS_API_TOKEN')
494 api_token = api_client.api_token
495 elif api_client is not None:
497 "can't build KeepClient with both API client and token")
498 if local_store is None:
499 local_store = os.environ.get('KEEP_LOCAL_STORE')
501 self.block_cache = block_cache if block_cache else KeepBlockCache()
502 self.timeout = timeout
503 self.proxy_timeout = proxy_timeout
506 self.local_store = local_store
507 self.get = self.local_store_get
508 self.put = self.local_store_put
510 self.num_retries = num_retries
511 self.session = session if session is not None else requests.Session()
513 if not proxy.endswith('/'):
515 self.api_token = api_token
516 self._keep_services = [{
518 '_service_root': proxy,
520 self.using_proxy = True
521 self._static_services_list = True
523 # It's important to avoid instantiating an API client
524 # unless we actually need one, for testing's sake.
525 if api_client is None:
526 api_client = arvados.api('v1')
527 self.api_client = api_client
528 self.api_token = api_client.api_token
529 self._keep_services = None
530 self.using_proxy = None
531 self._static_services_list = False
533 def current_timeout(self):
534 """Return the appropriate timeout to use for this client: the proxy
535 timeout setting if the backend service is currently a proxy,
536 the regular timeout setting otherwise.
538 # TODO(twp): the timeout should be a property of a
539 # KeepService, not a KeepClient. See #4488.
540 return self.proxy_timeout if self.using_proxy else self.timeout
542 def build_services_list(self, force_rebuild=False):
543 if (self._static_services_list or
544 (self._keep_services and not force_rebuild)):
548 keep_services = self.api_client.keep_services().accessible()
549 except Exception: # API server predates Keep services.
550 keep_services = self.api_client.keep_disks().list()
552 self._keep_services = keep_services.execute().get('items')
553 if not self._keep_services:
554 raise arvados.errors.NoKeepServersError()
556 self.using_proxy = any(ks.get('service_type') == 'proxy'
557 for ks in self._keep_services)
559 # Precompute the base URI for each service.
560 for r in self._keep_services:
561 r['_service_root'] = "{}://[{}]:{:d}/".format(
562 'https' if r['service_ssl_flag'] else 'http',
565 _logger.debug(str(self._keep_services))
567 def _service_weight(self, data_hash, service_uuid):
568 """Compute the weight of a Keep service endpoint for a data
569 block with a known hash.
571 The weight is md5(h + u) where u is the last 15 characters of
572 the service endpoint's UUID.
574 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
576 def weighted_service_roots(self, data_hash, force_rebuild=False):
577 """Return an array of Keep service endpoints, in the order in
578 which they should be probed when reading or writing data with
581 self.build_services_list(force_rebuild)
583 # Sort the available services by weight (heaviest first) for
584 # this data_hash, and return their service_roots (base URIs)
587 svc['_service_root'] for svc in sorted(
590 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
591 _logger.debug(data_hash + ': ' + str(sorted_roots))
594 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
595 # roots_map is a dictionary, mapping Keep service root strings
596 # to KeepService objects. Poll for Keep services, and add any
597 # new ones to roots_map. Return the current list of local
599 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
600 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
601 for root in local_roots:
602 if root not in roots_map:
603 roots_map[root] = self.KeepService(root, self.session, **headers)
607 def _check_loop_result(result):
608 # KeepClient RetryLoops should save results as a 2-tuple: the
609 # actual result of the request, and the number of servers available
610 # to receive the request this round.
611 # This method returns True if there's a real result, False if
612 # there are no more servers available, otherwise None.
613 if isinstance(result, Exception):
615 result, tried_server_count = result
616 if (result is not None) and (result is not False):
618 elif tried_server_count < 1:
619 _logger.info("No more Keep services to try; giving up")
624 def get_from_cache(self, loc):
625 """Fetch a block only if is in the cache, otherwise return None."""
626 slot = self.block_cache.get(loc)
627 if slot.ready.is_set():
633 def get(self, loc_s, num_retries=None):
634 """Get data from Keep.
636 This method fetches one or more blocks of data from Keep. It
637 sends a request each Keep service registered with the API
638 server (or the proxy provided when this client was
639 instantiated), then each service named in location hints, in
640 sequence. As soon as one service provides the data, it's
644 * loc_s: A string of one or more comma-separated locators to fetch.
645 This method returns the concatenation of these blocks.
646 * num_retries: The number of times to retry GET requests to
647 *each* Keep server if it returns temporary failures, with
648 exponential backoff. Note that, in each loop, the method may try
649 to fetch data from every available Keep service, along with any
650 that are named in location hints in the locator. The default value
651 is set when the KeepClient is initialized.
654 return ''.join(self.get(x) for x in loc_s.split(','))
655 locator = KeepLocator(loc_s)
656 expect_hash = locator.md5sum
657 slot, first = self.block_cache.reserve_cache(expect_hash)
662 # See #3147 for a discussion of the loop implementation. Highlights:
663 # * Refresh the list of Keep services after each failure, in case
664 # it's being updated.
665 # * Retry until we succeed, we're out of retries, or every available
666 # service has returned permanent failure.
667 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
668 for hint in locator.hints if hint.startswith('K@')]
669 # Map root URLs their KeepService objects.
670 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
672 loop = retry.RetryLoop(num_retries, self._check_loop_result,
674 connect_timeout_scale = 1
675 for tries_left in loop:
677 local_roots = self.map_new_services(
678 roots_map, expect_hash,
679 force_rebuild=(tries_left < num_retries))
680 except Exception as error:
681 loop.save_result(error)
684 reader_timeout = (self.current_timeout()[0] * connect_timeout_scale, self.current_timeout()[1])
685 connect_timeout_scale *= 2
687 # Query KeepService objects that haven't returned
688 # permanent failure, in our specified shuffle order.
689 services_to_try = [roots_map[root]
690 for root in (local_roots + hint_roots)
691 if roots_map[root].usable()]
692 for keep_service in services_to_try:
693 blob = keep_service.get(locator, timeout=reader_timeout)
696 loop.save_result((blob, len(services_to_try)))
698 # Always cache the result, then return it if we succeeded.
700 self.block_cache.cap_cache()
705 all_roots = local_roots + hint_roots
707 # We never successfully fetched local_roots.
708 all_roots = hint_roots
709 # Q: Including 403 is necessary for the Keep tests to continue
710 # passing, but maybe they should expect KeepReadError instead?
711 not_founds = sum(1 for key in all_roots
712 if roots_map[key].last_status() in {403, 404, 410})
713 service_errors = ((key, roots_map[key].last_result)
714 for key in all_roots)
716 raise arvados.errors.KeepReadError(
717 "failed to read {}: no Keep services available ({})".format(
718 loc_s, loop.last_result()))
719 elif not_founds == len(all_roots):
720 raise arvados.errors.NotFoundError(
721 "{} not found".format(loc_s), service_errors)
723 raise arvados.errors.KeepReadError(
724 "failed to read {}".format(loc_s), service_errors, label="service")
727 def put(self, data, copies=2, num_retries=None):
728 """Save data in Keep.
730 This method will get a list of Keep services from the API server, and
731 send the data to each one simultaneously in a new thread. Once the
732 uploads are finished, if enough copies are saved, this method returns
733 the most recent HTTP response body. If requests fail to upload
734 enough copies, this method raises KeepWriteError.
737 * data: The string of data to upload.
738 * copies: The number of copies that the user requires be saved.
740 * num_retries: The number of times to retry PUT requests to
741 *each* Keep server if it returns temporary failures, with
742 exponential backoff. The default value is set when the
743 KeepClient is initialized.
746 if isinstance(data, unicode):
747 data = data.encode("ascii")
748 elif not isinstance(data, str):
749 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
751 data_hash = hashlib.md5(data).hexdigest()
757 # Tell the proxy how many copies we want it to store
758 headers['X-Keep-Desired-Replication'] = str(copies)
760 thread_limiter = KeepClient.ThreadLimiter(copies)
761 loop = retry.RetryLoop(num_retries, self._check_loop_result,
763 connect_timeout_scale = 1
764 for tries_left in loop:
766 local_roots = self.map_new_services(
767 roots_map, data_hash,
768 force_rebuild=(tries_left < num_retries), **headers)
769 except Exception as error:
770 loop.save_result(error)
773 writer_timeout = (self.current_timeout()[0] * connect_timeout_scale, self.current_timeout()[1])
774 connect_timeout_scale *= 2
777 for service_root, ks in roots_map.iteritems():
780 t = KeepClient.KeepWriterThread(
784 service_root=service_root,
785 thread_limiter=thread_limiter,
786 timeout=writer_timeout)
791 loop.save_result((thread_limiter.done() >= copies, len(threads)))
794 return thread_limiter.response()
796 raise arvados.errors.KeepWriteError(
797 "failed to write {}: no Keep services available ({})".format(
798 data_hash, loop.last_result()))
800 service_errors = ((key, roots_map[key].last_result)
801 for key in local_roots
802 if not roots_map[key].success_flag)
803 raise arvados.errors.KeepWriteError(
804 "failed to write {} (wanted {} copies but wrote {})".format(
805 data_hash, copies, thread_limiter.done()), service_errors, label="service")
807 def local_store_put(self, data, copies=1, num_retries=None):
810 This method is used in place of the real put() method when
811 using local storage (see constructor's local_store argument).
813 copies and num_retries arguments are ignored: they are here
814 only for the sake of offering the same call signature as
817 Data stored this way can be retrieved via local_store_get().
819 md5 = hashlib.md5(data).hexdigest()
820 locator = '%s+%d' % (md5, len(data))
821 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
823 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
824 os.path.join(self.local_store, md5))
827 def local_store_get(self, loc_s, num_retries=None):
828 """Companion to local_store_put()."""
830 locator = KeepLocator(loc_s)
832 raise arvados.errors.NotFoundError(
833 "Invalid data locator: '%s'" % loc_s)
834 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
836 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
839 def is_cached(self, locator):
840 return self.block_cache.reserve_cache(expect_hash)