25 import arvados.config as config
27 import arvados.retry as retry
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
62 if self.size is not None:
63 return "%s+%i" % (self.md5sum, self.size)
67 def _make_hex_prop(name, length):
68 # Build and return a new property with the given name that
69 # must be a hex string of the given length.
70 data_name = '_{}'.format(name)
72 return getattr(self, data_name)
73 def setter(self, hex_str):
74 if not arvados.util.is_hex(hex_str, length):
75 raise ValueError("{} must be a {}-digit hex string: {}".
76 format(name, length, hex_str))
77 setattr(self, data_name, hex_str)
78 return property(getter, setter)
80 md5sum = _make_hex_prop('md5sum', 32)
81 perm_sig = _make_hex_prop('perm_sig', 40)
84 def perm_expiry(self):
85 return self._perm_expiry
88 def perm_expiry(self, value):
89 if not arvados.util.is_hex(value, 1, 8):
91 "permission timestamp must be a hex Unix timestamp: {}".
93 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
95 def permission_hint(self):
96 data = [self.perm_sig, self.perm_expiry]
99 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
100 return "A{}@{:08x}".format(*data)
102 def parse_permission_hint(self, s):
104 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
106 raise ValueError("bad permission hint {}".format(s))
108 def permission_expired(self, as_of_dt=None):
109 if self.perm_expiry is None:
111 elif as_of_dt is None:
112 as_of_dt = datetime.datetime.now()
113 return self.perm_expiry <= as_of_dt
117 """Simple interface to a global KeepClient object.
119 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
120 own API client. The global KeepClient will build an API client from the
121 current Arvados configuration, which may not match the one you built.
126 def global_client_object(cls):
127 global global_client_object
128 # Previously, KeepClient would change its behavior at runtime based
129 # on these configuration settings. We simulate that behavior here
130 # by checking the values and returning a new KeepClient if any of
132 key = (config.get('ARVADOS_API_HOST'),
133 config.get('ARVADOS_API_TOKEN'),
134 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
135 config.get('ARVADOS_KEEP_PROXY'),
136 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
137 os.environ.get('KEEP_LOCAL_STORE'))
138 if (global_client_object is None) or (cls._last_key != key):
139 global_client_object = KeepClient()
141 return global_client_object
144 def get(locator, **kwargs):
145 return Keep.global_client_object().get(locator, **kwargs)
148 def put(data, **kwargs):
149 return Keep.global_client_object().put(data, **kwargs)
151 class KeepBlockCache(object):
152 # Default RAM cache is 256MiB
153 def __init__(self, cache_max=(256 * 1024 * 1024)):
154 self.cache_max = cache_max
156 self._cache_lock = threading.Lock()
158 class CacheSlot(object):
159 def __init__(self, locator):
160 self.locator = locator
161 self.ready = threading.Event()
168 def set(self, value):
173 if self.content is None:
176 return len(self.content)
179 '''Cap the cache size to self.cache_max'''
180 with self._cache_lock:
181 # Select all slots except those where ready.is_set() and content is
182 # None (that means there was an error reading the block).
183 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
184 sm = sum([slot.size() for slot in self._cache])
185 while len(self._cache) > 0 and sm > self.cache_max:
186 for i in xrange(len(self._cache)-1, -1, -1):
187 if self._cache[i].ready.is_set():
190 sm = sum([slot.size() for slot in self._cache])
192 def _get(self, locator):
193 # Test if the locator is already in the cache
194 for i in xrange(0, len(self._cache)):
195 if self._cache[i].locator == locator:
198 # move it to the front
200 self._cache.insert(0, n)
204 def get(self, locator):
205 with self._cache_lock:
206 return self._get(locator)
208 def reserve_cache(self, locator):
209 '''Reserve a cache slot for the specified locator,
210 or return the existing slot.'''
211 with self._cache_lock:
212 n = self._get(locator)
216 # Add a new cache slot for the locator
217 n = KeepBlockCache.CacheSlot(locator)
218 self._cache.insert(0, n)
221 class KeepClient(object):
223 # Default Keep server connection timeout: 2 seconds
224 # Default Keep server read timeout: 300 seconds
225 # Default Keep proxy connection timeout: 20 seconds
226 # Default Keep proxy read timeout: 300 seconds
227 DEFAULT_TIMEOUT = (2, 300)
228 DEFAULT_PROXY_TIMEOUT = (20, 300)
230 class ThreadLimiter(object):
232 Limit the number of threads running at a given time to
233 {desired successes} minus {successes reported}. When successes
234 reported == desired, wake up the remaining threads and tell
237 Should be used in a "with" block.
239 def __init__(self, todo):
242 self._response = None
243 self._todo_lock = threading.Semaphore(todo)
244 self._done_lock = threading.Lock()
247 self._todo_lock.acquire()
250 def __exit__(self, type, value, traceback):
251 self._todo_lock.release()
253 def shall_i_proceed(self):
255 Return true if the current thread should do stuff. Return
256 false if the current thread should just stop.
258 with self._done_lock:
259 return (self._done < self._todo)
261 def save_response(self, response_body, replicas_stored):
263 Records a response body (a locator, possibly signed) returned by
264 the Keep server. It is not necessary to save more than
265 one response, since we presume that any locator returned
266 in response to a successful request is valid.
268 with self._done_lock:
269 self._done += replicas_stored
270 self._response = response_body
274 Returns the body from the response to a PUT request.
276 with self._done_lock:
277 return self._response
281 Return how many successes were reported.
283 with self._done_lock:
287 class KeepService(object):
288 # Make requests to a single Keep service, and track results.
289 HTTP_ERRORS = (requests.exceptions.RequestException,
290 socket.error, ssl.SSLError)
292 def __init__(self, root, session, **headers):
294 self.last_result = None
295 self.success_flag = None
296 self.session = session
297 self.get_headers = {'Accept': 'application/octet-stream'}
298 self.get_headers.update(headers)
299 self.put_headers = headers
302 return self.success_flag is not False
305 return self.success_flag is not None
307 def last_status(self):
309 return self.last_result.status_code
310 except AttributeError:
313 def get(self, locator, timeout=None):
314 # locator is a KeepLocator object.
315 url = self.root + str(locator)
316 _logger.debug("Request: GET %s", url)
318 with timer.Timer() as t:
319 result = self.session.get(url.encode('utf-8'),
320 headers=self.get_headers,
322 except self.HTTP_ERRORS as e:
323 _logger.debug("Request fail: GET %s => %s: %s",
324 url, type(e), str(e))
327 self.last_result = result
328 self.success_flag = retry.check_http_response_success(result)
329 content = result.content
330 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
331 self.last_status(), len(content), t.msecs,
332 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
333 if self.success_flag:
334 resp_md5 = hashlib.md5(content).hexdigest()
335 if resp_md5 == locator.md5sum:
337 _logger.warning("Checksum fail: md5(%s) = %s",
341 def put(self, hash_s, body, timeout=None):
342 url = self.root + hash_s
343 _logger.debug("Request: PUT %s", url)
345 result = self.session.put(url.encode('utf-8'),
347 headers=self.put_headers,
349 except self.HTTP_ERRORS as e:
350 _logger.debug("Request fail: PUT %s => %s: %s",
351 url, type(e), str(e))
354 self.last_result = result
355 self.success_flag = retry.check_http_response_success(result)
356 return self.success_flag
359 class KeepWriterThread(threading.Thread):
361 Write a blob of data to the given Keep server. On success, call
362 save_response() of the given ThreadLimiter to save the returned
365 def __init__(self, keep_service, **kwargs):
366 super(KeepClient.KeepWriterThread, self).__init__()
367 self.service = keep_service
369 self._success = False
375 with self.args['thread_limiter'] as limiter:
376 if not limiter.shall_i_proceed():
377 # My turn arrived, but the job has been done without
380 self.run_with_limiter(limiter)
382 def run_with_limiter(self, limiter):
383 if self.service.finished():
385 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
386 str(threading.current_thread()),
387 self.args['data_hash'],
388 len(self.args['data']),
389 self.args['service_root'])
390 self._success = bool(self.service.put(
391 self.args['data_hash'],
393 timeout=self.args.get('timeout', None)))
394 status = self.service.last_status()
396 result = self.service.last_result
397 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
398 str(threading.current_thread()),
399 self.args['data_hash'],
400 len(self.args['data']),
401 self.args['service_root'])
402 # Tick the 'done' counter for the number of replica
403 # reported stored by the server, for the case that
404 # we're talking to a proxy or other backend that
405 # stores to multiple copies for us.
407 replicas_stored = int(result.headers['x-keep-replicas-stored'])
408 except (KeyError, ValueError):
410 limiter.save_response(result.content.strip(), replicas_stored)
411 elif status is not None:
412 _logger.debug("Request fail: PUT %s => %s %s",
413 self.args['data_hash'], status,
414 self.service.last_result.content)
417 def __init__(self, api_client=None, proxy=None,
418 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
419 api_token=None, local_store=None, block_cache=None,
420 num_retries=0, session=None):
421 """Initialize a new KeepClient.
425 The API client to use to find Keep services. If not
426 provided, KeepClient will build one from available Arvados
430 If specified, this KeepClient will send requests to this Keep
431 proxy. Otherwise, KeepClient will fall back to the setting of the
432 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
433 KeepClient does not use a proxy, pass in an empty string.
436 The timeout (in seconds) for HTTP requests to Keep
437 non-proxy servers. A tuple of two floats is interpreted as
438 (connection_timeout, read_timeout): see
439 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
443 The timeout (in seconds) for HTTP requests to
444 Keep proxies. A tuple of two floats is interpreted as
445 (connection_timeout, read_timeout). Default: (20, 300).
448 If you're not using an API client, but only talking
449 directly to a Keep proxy, this parameter specifies an API token
450 to authenticate Keep requests. It is an error to specify both
451 api_client and api_token. If you specify neither, KeepClient
452 will use one available from the Arvados configuration.
455 If specified, this KeepClient will bypass Keep
456 services, and save data to the named directory. If unspecified,
457 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
458 environment variable. If you want to ensure KeepClient does not
459 use local storage, pass in an empty string. This is primarily
460 intended to mock a server for testing.
463 The default number of times to retry failed requests.
464 This will be used as the default num_retries value when get() and
465 put() are called. Default 0.
468 The requests.Session object to use for get() and put() requests.
469 Will create one if not specified.
471 self.lock = threading.Lock()
473 proxy = config.get('ARVADOS_KEEP_PROXY')
474 if api_token is None:
475 if api_client is None:
476 api_token = config.get('ARVADOS_API_TOKEN')
478 api_token = api_client.api_token
479 elif api_client is not None:
481 "can't build KeepClient with both API client and token")
482 if local_store is None:
483 local_store = os.environ.get('KEEP_LOCAL_STORE')
485 self.block_cache = block_cache if block_cache else KeepBlockCache()
486 self.timeout = timeout
487 self.proxy_timeout = proxy_timeout
490 self.local_store = local_store
491 self.get = self.local_store_get
492 self.put = self.local_store_put
494 self.num_retries = num_retries
495 self.session = session if session is not None else requests.Session()
497 if not proxy.endswith('/'):
499 self.api_token = api_token
500 self._keep_services = [{
502 '_service_root': proxy,
504 self.using_proxy = True
505 self._static_services_list = True
507 # It's important to avoid instantiating an API client
508 # unless we actually need one, for testing's sake.
509 if api_client is None:
510 api_client = arvados.api('v1')
511 self.api_client = api_client
512 self.api_token = api_client.api_token
513 self._keep_services = None
514 self.using_proxy = None
515 self._static_services_list = False
517 def current_timeout(self):
518 """Return the appropriate timeout to use for this client: the proxy
519 timeout setting if the backend service is currently a proxy,
520 the regular timeout setting otherwise.
522 # TODO(twp): the timeout should be a property of a
523 # KeepService, not a KeepClient. See #4488.
524 return self.proxy_timeout if self.using_proxy else self.timeout
526 def build_services_list(self, force_rebuild=False):
527 if (self._static_services_list or
528 (self._keep_services and not force_rebuild)):
532 keep_services = self.api_client.keep_services().accessible()
533 except Exception: # API server predates Keep services.
534 keep_services = self.api_client.keep_disks().list()
536 self._keep_services = keep_services.execute().get('items')
537 if not self._keep_services:
538 raise arvados.errors.NoKeepServersError()
540 self.using_proxy = any(ks.get('service_type') == 'proxy'
541 for ks in self._keep_services)
543 # Precompute the base URI for each service.
544 for r in self._keep_services:
545 r['_service_root'] = "{}://[{}]:{:d}/".format(
546 'https' if r['service_ssl_flag'] else 'http',
549 _logger.debug(str(self._keep_services))
551 def _service_weight(self, data_hash, service_uuid):
552 """Compute the weight of a Keep service endpoint for a data
553 block with a known hash.
555 The weight is md5(h + u) where u is the last 15 characters of
556 the service endpoint's UUID.
558 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
560 def weighted_service_roots(self, data_hash, force_rebuild=False):
561 """Return an array of Keep service endpoints, in the order in
562 which they should be probed when reading or writing data with
565 self.build_services_list(force_rebuild)
567 # Sort the available services by weight (heaviest first) for
568 # this data_hash, and return their service_roots (base URIs)
571 svc['_service_root'] for svc in sorted(
574 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
575 _logger.debug(data_hash + ': ' + str(sorted_roots))
578 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
579 # roots_map is a dictionary, mapping Keep service root strings
580 # to KeepService objects. Poll for Keep services, and add any
581 # new ones to roots_map. Return the current list of local
583 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
584 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
585 for root in local_roots:
586 if root not in roots_map:
587 roots_map[root] = self.KeepService(root, self.session, **headers)
591 def _check_loop_result(result):
592 # KeepClient RetryLoops should save results as a 2-tuple: the
593 # actual result of the request, and the number of servers available
594 # to receive the request this round.
595 # This method returns True if there's a real result, False if
596 # there are no more servers available, otherwise None.
597 if isinstance(result, Exception):
599 result, tried_server_count = result
600 if (result is not None) and (result is not False):
602 elif tried_server_count < 1:
603 _logger.info("No more Keep services to try; giving up")
608 def get_from_cache(self, loc):
609 """Fetch a block only if is in the cache, otherwise return None."""
610 slot = self.block_cache.get(loc)
611 if slot.ready.is_set():
617 def get(self, loc_s, num_retries=None):
618 """Get data from Keep.
620 This method fetches one or more blocks of data from Keep. It
621 sends a request each Keep service registered with the API
622 server (or the proxy provided when this client was
623 instantiated), then each service named in location hints, in
624 sequence. As soon as one service provides the data, it's
628 * loc_s: A string of one or more comma-separated locators to fetch.
629 This method returns the concatenation of these blocks.
630 * num_retries: The number of times to retry GET requests to
631 *each* Keep server if it returns temporary failures, with
632 exponential backoff. Note that, in each loop, the method may try
633 to fetch data from every available Keep service, along with any
634 that are named in location hints in the locator. The default value
635 is set when the KeepClient is initialized.
638 return ''.join(self.get(x) for x in loc_s.split(','))
639 locator = KeepLocator(loc_s)
640 expect_hash = locator.md5sum
641 slot, first = self.block_cache.reserve_cache(expect_hash)
646 # See #3147 for a discussion of the loop implementation. Highlights:
647 # * Refresh the list of Keep services after each failure, in case
648 # it's being updated.
649 # * Retry until we succeed, we're out of retries, or every available
650 # service has returned permanent failure.
651 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
652 for hint in locator.hints if hint.startswith('K@')]
653 # Map root URLs their KeepService objects.
654 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
656 loop = retry.RetryLoop(num_retries, self._check_loop_result,
658 for tries_left in loop:
660 local_roots = self.map_new_services(
661 roots_map, expect_hash,
662 force_rebuild=(tries_left < num_retries))
663 except Exception as error:
664 loop.save_result(error)
667 # Query KeepService objects that haven't returned
668 # permanent failure, in our specified shuffle order.
669 services_to_try = [roots_map[root]
670 for root in (local_roots + hint_roots)
671 if roots_map[root].usable()]
672 for keep_service in services_to_try:
673 blob = keep_service.get(locator, timeout=self.current_timeout())
676 loop.save_result((blob, len(services_to_try)))
678 # Always cache the result, then return it if we succeeded.
680 self.block_cache.cap_cache()
685 all_roots = local_roots + hint_roots
687 # We never successfully fetched local_roots.
688 all_roots = hint_roots
689 # Q: Including 403 is necessary for the Keep tests to continue
690 # passing, but maybe they should expect KeepReadError instead?
691 not_founds = sum(1 for key in all_roots
692 if roots_map[key].last_status() in {403, 404, 410})
693 service_errors = ((key, roots_map[key].last_result)
694 for key in all_roots)
696 raise arvados.errors.KeepReadError(
697 "failed to read {}: no Keep services available ({})".format(
698 loc_s, loop.last_result()))
699 elif not_founds == len(all_roots):
700 raise arvados.errors.NotFoundError(
701 "{} not found".format(loc_s), service_errors)
703 raise arvados.errors.KeepReadError(
704 "failed to read {}".format(loc_s), service_errors, label="service")
707 def put(self, data, copies=2, num_retries=None):
708 """Save data in Keep.
710 This method will get a list of Keep services from the API server, and
711 send the data to each one simultaneously in a new thread. Once the
712 uploads are finished, if enough copies are saved, this method returns
713 the most recent HTTP response body. If requests fail to upload
714 enough copies, this method raises KeepWriteError.
717 * data: The string of data to upload.
718 * copies: The number of copies that the user requires be saved.
720 * num_retries: The number of times to retry PUT requests to
721 *each* Keep server if it returns temporary failures, with
722 exponential backoff. The default value is set when the
723 KeepClient is initialized.
726 if isinstance(data, unicode):
727 data = data.encode("ascii")
728 elif not isinstance(data, str):
729 raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
731 data_hash = hashlib.md5(data).hexdigest()
737 # Tell the proxy how many copies we want it to store
738 headers['X-Keep-Desired-Replication'] = str(copies)
740 thread_limiter = KeepClient.ThreadLimiter(copies)
741 loop = retry.RetryLoop(num_retries, self._check_loop_result,
743 for tries_left in loop:
745 local_roots = self.map_new_services(
746 roots_map, data_hash,
747 force_rebuild=(tries_left < num_retries), **headers)
748 except Exception as error:
749 loop.save_result(error)
753 for service_root, ks in roots_map.iteritems():
756 t = KeepClient.KeepWriterThread(
760 service_root=service_root,
761 thread_limiter=thread_limiter,
762 timeout=self.current_timeout())
767 loop.save_result((thread_limiter.done() >= copies, len(threads)))
770 return thread_limiter.response()
772 raise arvados.errors.KeepWriteError(
773 "failed to write {}: no Keep services available ({})".format(
774 data_hash, loop.last_result()))
776 service_errors = ((key, roots_map[key].last_result)
777 for key in local_roots
778 if not roots_map[key].success_flag)
779 raise arvados.errors.KeepWriteError(
780 "failed to write {} (wanted {} copies but wrote {})".format(
781 data_hash, copies, thread_limiter.done()), service_errors, label="service")
783 def local_store_put(self, data, copies=1, num_retries=None):
786 This method is used in place of the real put() method when
787 using local storage (see constructor's local_store argument).
789 copies and num_retries arguments are ignored: they are here
790 only for the sake of offering the same call signature as
793 Data stored this way can be retrieved via local_store_get().
795 md5 = hashlib.md5(data).hexdigest()
796 locator = '%s+%d' % (md5, len(data))
797 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
799 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
800 os.path.join(self.local_store, md5))
803 def local_store_get(self, loc_s, num_retries=None):
804 """Companion to local_store_put()."""
806 locator = KeepLocator(loc_s)
808 raise arvados.errors.NotFoundError(
809 "Invalid data locator: '%s'" % loc_s)
810 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
812 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
815 def is_cached(self, locator):
816 return self.block_cache.reserve_cache(expect_hash)