25 import arvados.config as config
27 import arvados.retry as retry
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
62 return "%s+%i" % (self.md5sum, self.size)
64 def _make_hex_prop(name, length):
65 # Build and return a new property with the given name that
66 # must be a hex string of the given length.
67 data_name = '_{}'.format(name)
69 return getattr(self, data_name)
70 def setter(self, hex_str):
71 if not arvados.util.is_hex(hex_str, length):
72 raise ValueError("{} must be a {}-digit hex string: {}".
73 format(name, length, hex_str))
74 setattr(self, data_name, hex_str)
75 return property(getter, setter)
77 md5sum = _make_hex_prop('md5sum', 32)
78 perm_sig = _make_hex_prop('perm_sig', 40)
81 def perm_expiry(self):
82 return self._perm_expiry
85 def perm_expiry(self, value):
86 if not arvados.util.is_hex(value, 1, 8):
88 "permission timestamp must be a hex Unix timestamp: {}".
90 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
92 def permission_hint(self):
93 data = [self.perm_sig, self.perm_expiry]
96 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
97 return "A{}@{:08x}".format(*data)
99 def parse_permission_hint(self, s):
101 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
103 raise ValueError("bad permission hint {}".format(s))
105 def permission_expired(self, as_of_dt=None):
106 if self.perm_expiry is None:
108 elif as_of_dt is None:
109 as_of_dt = datetime.datetime.now()
110 return self.perm_expiry <= as_of_dt
114 """Simple interface to a global KeepClient object.
116 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
117 own API client. The global KeepClient will build an API client from the
118 current Arvados configuration, which may not match the one you built.
123 def global_client_object(cls):
124 global global_client_object
125 # Previously, KeepClient would change its behavior at runtime based
126 # on these configuration settings. We simulate that behavior here
127 # by checking the values and returning a new KeepClient if any of
129 key = (config.get('ARVADOS_API_HOST'),
130 config.get('ARVADOS_API_TOKEN'),
131 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
132 config.get('ARVADOS_KEEP_PROXY'),
133 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
134 os.environ.get('KEEP_LOCAL_STORE'))
135 if (global_client_object is None) or (cls._last_key != key):
136 global_client_object = KeepClient()
138 return global_client_object
141 def get(locator, **kwargs):
142 return Keep.global_client_object().get(locator, **kwargs)
145 def put(data, **kwargs):
146 return Keep.global_client_object().put(data, **kwargs)
148 class KeepBlockCache(object):
149 # Default RAM cache is 256MiB
150 def __init__(self, cache_max=(256 * 1024 * 1024)):
151 self.cache_max = cache_max
153 self._cache_lock = threading.Lock()
155 class CacheSlot(object):
156 def __init__(self, locator):
157 self.locator = locator
158 self.ready = threading.Event()
165 def set(self, value):
170 if self.content is None:
173 return len(self.content)
176 '''Cap the cache size to self.cache_max'''
177 with self._cache_lock:
178 # Select all slots except those where ready.is_set() and content is
179 # None (that means there was an error reading the block).
180 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
181 sm = sum([slot.size() for slot in self._cache])
182 while len(self._cache) > 0 and sm > self.cache_max:
183 for i in xrange(len(self._cache)-1, -1, -1):
184 if self._cache[i].ready.is_set():
187 sm = sum([slot.size() for slot in self._cache])
189 def _get(self, locator):
190 # Test if the locator is already in the cache
191 for i in xrange(0, len(self._cache)):
192 if self._cache[i].locator == locator:
195 # move it to the front
197 self._cache.insert(0, n)
201 def get(self, locator):
202 with self._cache_lock:
203 return self._get(locator)
205 def reserve_cache(self, locator):
206 '''Reserve a cache slot for the specified locator,
207 or return the existing slot.'''
208 with self._cache_lock:
209 n = self._get(locator)
213 # Add a new cache slot for the locator
214 n = KeepBlockCache.CacheSlot(locator)
215 self._cache.insert(0, n)
218 class KeepClient(object):
220 # Default Keep server connection timeout: 2 seconds
221 # Default Keep server read timeout: 300 seconds
222 # Default Keep proxy connection timeout: 20 seconds
223 # Default Keep proxy read timeout: 300 seconds
224 DEFAULT_TIMEOUT = (2, 300)
225 DEFAULT_PROXY_TIMEOUT = (20, 300)
227 class ThreadLimiter(object):
229 Limit the number of threads running at a given time to
230 {desired successes} minus {successes reported}. When successes
231 reported == desired, wake up the remaining threads and tell
234 Should be used in a "with" block.
236 def __init__(self, todo):
239 self._response = None
240 self._todo_lock = threading.Semaphore(todo)
241 self._done_lock = threading.Lock()
244 self._todo_lock.acquire()
247 def __exit__(self, type, value, traceback):
248 self._todo_lock.release()
250 def shall_i_proceed(self):
252 Return true if the current thread should do stuff. Return
253 false if the current thread should just stop.
255 with self._done_lock:
256 return (self._done < self._todo)
258 def save_response(self, response_body, replicas_stored):
260 Records a response body (a locator, possibly signed) returned by
261 the Keep server. It is not necessary to save more than
262 one response, since we presume that any locator returned
263 in response to a successful request is valid.
265 with self._done_lock:
266 self._done += replicas_stored
267 self._response = response_body
271 Returns the body from the response to a PUT request.
273 with self._done_lock:
274 return self._response
278 Return how many successes were reported.
280 with self._done_lock:
284 class KeepService(object):
285 # Make requests to a single Keep service, and track results.
286 HTTP_ERRORS = (requests.exceptions.RequestException,
287 socket.error, ssl.SSLError)
289 def __init__(self, root, session, **headers):
291 self.last_result = None
292 self.success_flag = None
293 self.session = session
294 self.get_headers = {'Accept': 'application/octet-stream'}
295 self.get_headers.update(headers)
296 self.put_headers = headers
299 return self.success_flag is not False
302 return self.success_flag is not None
304 def last_status(self):
306 return self.last_result.status_code
307 except AttributeError:
310 def get(self, locator, timeout=None):
311 # locator is a KeepLocator object.
312 url = self.root + str(locator)
313 _logger.debug("Request: GET %s", url)
315 with timer.Timer() as t:
316 result = self.session.get(url.encode('utf-8'),
317 headers=self.get_headers,
319 except self.HTTP_ERRORS as e:
320 _logger.debug("Request fail: GET %s => %s: %s",
321 url, type(e), str(e))
324 self.last_result = result
325 self.success_flag = retry.check_http_response_success(result)
326 content = result.content
327 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
328 self.last_status(), len(content), t.msecs,
329 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
330 if self.success_flag:
331 resp_md5 = hashlib.md5(content).hexdigest()
332 if resp_md5 == locator.md5sum:
334 _logger.warning("Checksum fail: md5(%s) = %s",
338 def put(self, hash_s, body, timeout=None):
339 url = self.root + hash_s
340 _logger.debug("Request: PUT %s", url)
342 result = self.session.put(url.encode('utf-8'),
344 headers=self.put_headers,
346 except self.HTTP_ERRORS as e:
347 _logger.debug("Request fail: PUT %s => %s: %s",
348 url, type(e), str(e))
351 self.last_result = result
352 self.success_flag = retry.check_http_response_success(result)
353 return self.success_flag
356 class KeepWriterThread(threading.Thread):
358 Write a blob of data to the given Keep server. On success, call
359 save_response() of the given ThreadLimiter to save the returned
362 def __init__(self, keep_service, **kwargs):
363 super(KeepClient.KeepWriterThread, self).__init__()
364 self.service = keep_service
366 self._success = False
372 with self.args['thread_limiter'] as limiter:
373 if not limiter.shall_i_proceed():
374 # My turn arrived, but the job has been done without
377 self.run_with_limiter(limiter)
379 def run_with_limiter(self, limiter):
380 if self.service.finished():
382 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
383 str(threading.current_thread()),
384 self.args['data_hash'],
385 len(self.args['data']),
386 self.args['service_root'])
387 self._success = bool(self.service.put(
388 self.args['data_hash'],
390 timeout=self.args.get('timeout', None)))
391 status = self.service.last_status()
393 result = self.service.last_result
394 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
395 str(threading.current_thread()),
396 self.args['data_hash'],
397 len(self.args['data']),
398 self.args['service_root'])
399 # Tick the 'done' counter for the number of replica
400 # reported stored by the server, for the case that
401 # we're talking to a proxy or other backend that
402 # stores to multiple copies for us.
404 replicas_stored = int(result.headers['x-keep-replicas-stored'])
405 except (KeyError, ValueError):
407 limiter.save_response(result.text.strip(), replicas_stored)
408 elif status is not None:
409 _logger.debug("Request fail: PUT %s => %s %s",
410 self.args['data_hash'], status,
411 self.service.last_result.text)
414 def __init__(self, api_client=None, proxy=None,
415 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
416 api_token=None, local_store=None, block_cache=None,
417 num_retries=0, session=None):
418 """Initialize a new KeepClient.
421 * api_client: The API client to use to find Keep services. If not
422 provided, KeepClient will build one from available Arvados
424 * proxy: If specified, this KeepClient will send requests to this
425 Keep proxy. Otherwise, KeepClient will fall back to the setting
426 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
427 ensure KeepClient does not use a proxy, pass in an empty string.
428 * timeout: The timeout (in seconds) for HTTP requests to Keep
429 non-proxy servers. A tuple of two floats is interpreted as
430 (connection_timeout, read_timeout): see
431 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
433 * proxy_timeout: The timeout (in seconds) for HTTP requests to
434 Keep proxies. A tuple of two floats is interpreted as
435 (connection_timeout, read_timeout). Default: (20, 300).
436 * api_token: If you're not using an API client, but only talking
437 directly to a Keep proxy, this parameter specifies an API token
438 to authenticate Keep requests. It is an error to specify both
439 api_client and api_token. If you specify neither, KeepClient
440 will use one available from the Arvados configuration.
441 * local_store: If specified, this KeepClient will bypass Keep
442 services, and save data to the named directory. If unspecified,
443 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
444 environment variable. If you want to ensure KeepClient does not
445 use local storage, pass in an empty string. This is primarily
446 intended to mock a server for testing.
447 * num_retries: The default number of times to retry failed requests.
448 This will be used as the default num_retries value when get() and
449 put() are called. Default 0.
451 self.lock = threading.Lock()
453 proxy = config.get('ARVADOS_KEEP_PROXY')
454 if api_token is None:
455 if api_client is None:
456 api_token = config.get('ARVADOS_API_TOKEN')
458 api_token = api_client.api_token
459 elif api_client is not None:
461 "can't build KeepClient with both API client and token")
462 if local_store is None:
463 local_store = os.environ.get('KEEP_LOCAL_STORE')
465 self.block_cache = block_cache if block_cache else KeepBlockCache()
466 self.timeout = timeout
467 self.proxy_timeout = proxy_timeout
470 self.local_store = local_store
471 self.get = self.local_store_get
472 self.put = self.local_store_put
474 self.num_retries = num_retries
475 self.session = session if session is not None else requests.Session()
477 if not proxy.endswith('/'):
479 self.api_token = api_token
480 self._keep_services = [{
482 '_service_root': proxy,
484 self.using_proxy = True
485 self._static_services_list = True
487 # It's important to avoid instantiating an API client
488 # unless we actually need one, for testing's sake.
489 if api_client is None:
490 api_client = arvados.api('v1')
491 self.api_client = api_client
492 self.api_token = api_client.api_token
493 self._keep_services = None
494 self.using_proxy = None
495 self._static_services_list = False
497 def current_timeout(self):
498 """Return the appropriate timeout to use for this client: the proxy
499 timeout setting if the backend service is currently a proxy,
500 the regular timeout setting otherwise.
502 # TODO(twp): the timeout should be a property of a
503 # KeepService, not a KeepClient. See #4488.
504 return self.proxy_timeout if self.using_proxy else self.timeout
506 def build_services_list(self, force_rebuild=False):
507 if (self._static_services_list or
508 (self._keep_services and not force_rebuild)):
512 keep_services = self.api_client.keep_services().accessible()
513 except Exception: # API server predates Keep services.
514 keep_services = self.api_client.keep_disks().list()
516 self._keep_services = keep_services.execute().get('items')
517 if not self._keep_services:
518 raise arvados.errors.NoKeepServersError()
520 self.using_proxy = any(ks.get('service_type') == 'proxy'
521 for ks in self._keep_services)
523 # Precompute the base URI for each service.
524 for r in self._keep_services:
525 r['_service_root'] = "{}://[{}]:{:d}/".format(
526 'https' if r['service_ssl_flag'] else 'http',
529 _logger.debug(str(self._keep_services))
531 def _service_weight(self, data_hash, service_uuid):
532 """Compute the weight of a Keep service endpoint for a data
533 block with a known hash.
535 The weight is md5(h + u) where u is the last 15 characters of
536 the service endpoint's UUID.
538 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
540 def weighted_service_roots(self, data_hash, force_rebuild=False):
541 """Return an array of Keep service endpoints, in the order in
542 which they should be probed when reading or writing data with
545 self.build_services_list(force_rebuild)
547 # Sort the available services by weight (heaviest first) for
548 # this data_hash, and return their service_roots (base URIs)
551 svc['_service_root'] for svc in sorted(
554 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
555 _logger.debug(data_hash + ': ' + str(sorted_roots))
558 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
559 # roots_map is a dictionary, mapping Keep service root strings
560 # to KeepService objects. Poll for Keep services, and add any
561 # new ones to roots_map. Return the current list of local
563 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
564 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
565 for root in local_roots:
566 if root not in roots_map:
567 roots_map[root] = self.KeepService(root, self.session, **headers)
571 def _check_loop_result(result):
572 # KeepClient RetryLoops should save results as a 2-tuple: the
573 # actual result of the request, and the number of servers available
574 # to receive the request this round.
575 # This method returns True if there's a real result, False if
576 # there are no more servers available, otherwise None.
577 if isinstance(result, Exception):
579 result, tried_server_count = result
580 if (result is not None) and (result is not False):
582 elif tried_server_count < 1:
583 _logger.info("No more Keep services to try; giving up")
589 def get(self, loc_s, num_retries=None, cache_only=False):
590 """Get data from Keep.
592 This method fetches one or more blocks of data from Keep. It
593 sends a request each Keep service registered with the API
594 server (or the proxy provided when this client was
595 instantiated), then each service named in location hints, in
596 sequence. As soon as one service provides the data, it's
600 * loc_s: A string of one or more comma-separated locators to fetch.
601 This method returns the concatenation of these blocks.
602 * num_retries: The number of times to retry GET requests to
603 *each* Keep server if it returns temporary failures, with
604 exponential backoff. Note that, in each loop, the method may try
605 to fetch data from every available Keep service, along with any
606 that are named in location hints in the locator. The default value
607 is set when the KeepClient is initialized.
608 * cache_only: If true, return the block data only if already present in
609 cache, otherwise return None.
612 return ''.join(self.get(x) for x in loc_s.split(','))
613 locator = KeepLocator(loc_s)
614 expect_hash = locator.md5sum
617 slot = self.block_cache.get(expect_hash)
618 if slot.ready.is_set():
623 slot, first = self.block_cache.reserve_cache(expect_hash)
628 # See #3147 for a discussion of the loop implementation. Highlights:
629 # * Refresh the list of Keep services after each failure, in case
630 # it's being updated.
631 # * Retry until we succeed, we're out of retries, or every available
632 # service has returned permanent failure.
633 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
634 for hint in locator.hints if hint.startswith('K@')]
635 # Map root URLs their KeepService objects.
636 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
638 loop = retry.RetryLoop(num_retries, self._check_loop_result,
640 for tries_left in loop:
642 local_roots = self.map_new_services(
643 roots_map, expect_hash,
644 force_rebuild=(tries_left < num_retries))
645 except Exception as error:
646 loop.save_result(error)
649 # Query KeepService objects that haven't returned
650 # permanent failure, in our specified shuffle order.
651 services_to_try = [roots_map[root]
652 for root in (local_roots + hint_roots)
653 if roots_map[root].usable()]
654 for keep_service in services_to_try:
655 blob = keep_service.get(locator, timeout=self.current_timeout())
658 loop.save_result((blob, len(services_to_try)))
660 # Always cache the result, then return it if we succeeded.
662 self.block_cache.cap_cache()
667 all_roots = local_roots + hint_roots
669 # We never successfully fetched local_roots.
670 all_roots = hint_roots
671 # Q: Including 403 is necessary for the Keep tests to continue
672 # passing, but maybe they should expect KeepReadError instead?
673 not_founds = sum(1 for key in all_roots
674 if roots_map[key].last_status() in {403, 404, 410})
675 service_errors = ((key, roots_map[key].last_result)
676 for key in all_roots)
678 raise arvados.errors.KeepReadError(
679 "failed to read {}: no Keep services available ({})".format(
680 loc_s, loop.last_result()))
681 elif not_founds == len(all_roots):
682 raise arvados.errors.NotFoundError(
683 "{} not found".format(loc_s), service_errors)
685 raise arvados.errors.KeepReadError(
686 "failed to read {}".format(loc_s), service_errors)
689 def put(self, data, copies=2, num_retries=None):
690 """Save data in Keep.
692 This method will get a list of Keep services from the API server, and
693 send the data to each one simultaneously in a new thread. Once the
694 uploads are finished, if enough copies are saved, this method returns
695 the most recent HTTP response body. If requests fail to upload
696 enough copies, this method raises KeepWriteError.
699 * data: The string of data to upload.
700 * copies: The number of copies that the user requires be saved.
702 * num_retries: The number of times to retry PUT requests to
703 *each* Keep server if it returns temporary failures, with
704 exponential backoff. The default value is set when the
705 KeepClient is initialized.
707 data_hash = hashlib.md5(data).hexdigest()
713 # Tell the proxy how many copies we want it to store
714 headers['X-Keep-Desired-Replication'] = str(copies)
716 thread_limiter = KeepClient.ThreadLimiter(copies)
717 loop = retry.RetryLoop(num_retries, self._check_loop_result,
719 for tries_left in loop:
721 local_roots = self.map_new_services(
722 roots_map, data_hash,
723 force_rebuild=(tries_left < num_retries), **headers)
724 except Exception as error:
725 loop.save_result(error)
729 for service_root, ks in roots_map.iteritems():
732 t = KeepClient.KeepWriterThread(
736 service_root=service_root,
737 thread_limiter=thread_limiter,
738 timeout=self.current_timeout())
743 loop.save_result((thread_limiter.done() >= copies, len(threads)))
746 return thread_limiter.response()
748 raise arvados.errors.KeepWriteError(
749 "failed to write {}: no Keep services available ({})".format(
750 data_hash, loop.last_result()))
752 service_errors = ((key, roots_map[key].last_result)
753 for key in local_roots
754 if not roots_map[key].success_flag)
755 raise arvados.errors.KeepWriteError(
756 "failed to write {} (wanted {} copies but wrote {})".format(
757 data_hash, copies, thread_limiter.done()), service_errors)
759 # Local storage methods need no-op num_retries arguments to keep
760 # integration tests happy. With better isolation they could
761 # probably be removed again.
762 def local_store_put(self, data, num_retries=0):
763 md5 = hashlib.md5(data).hexdigest()
764 locator = '%s+%d' % (md5, len(data))
765 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
767 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
768 os.path.join(self.local_store, md5))
771 def local_store_get(self, loc_s, num_retries=0):
773 locator = KeepLocator(loc_s)
775 raise arvados.errors.NotFoundError(
776 "Invalid data locator: '%s'" % loc_s)
777 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
779 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
782 def is_cached(self, locator):
783 return self.block_cache.reserve_cache(expect_hash)