25 import arvados.config as config
27 import arvados.retry as retry
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
61 def _make_hex_prop(name, length):
62 # Build and return a new property with the given name that
63 # must be a hex string of the given length.
64 data_name = '_{}'.format(name)
66 return getattr(self, data_name)
67 def setter(self, hex_str):
68 if not arvados.util.is_hex(hex_str, length):
69 raise ValueError("{} must be a {}-digit hex string: {}".
70 format(name, length, hex_str))
71 setattr(self, data_name, hex_str)
72 return property(getter, setter)
74 md5sum = _make_hex_prop('md5sum', 32)
75 perm_sig = _make_hex_prop('perm_sig', 40)
78 def perm_expiry(self):
79 return self._perm_expiry
82 def perm_expiry(self, value):
83 if not arvados.util.is_hex(value, 1, 8):
85 "permission timestamp must be a hex Unix timestamp: {}".
87 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89 def permission_hint(self):
90 data = [self.perm_sig, self.perm_expiry]
93 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
94 return "A{}@{:08x}".format(*data)
96 def parse_permission_hint(self, s):
98 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100 raise ValueError("bad permission hint {}".format(s))
102 def permission_expired(self, as_of_dt=None):
103 if self.perm_expiry is None:
105 elif as_of_dt is None:
106 as_of_dt = datetime.datetime.now()
107 return self.perm_expiry <= as_of_dt
111 """Simple interface to a global KeepClient object.
113 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
114 own API client. The global KeepClient will build an API client from the
115 current Arvados configuration, which may not match the one you built.
120 def global_client_object(cls):
121 global global_client_object
122 # Previously, KeepClient would change its behavior at runtime based
123 # on these configuration settings. We simulate that behavior here
124 # by checking the values and returning a new KeepClient if any of
126 key = (config.get('ARVADOS_API_HOST'),
127 config.get('ARVADOS_API_TOKEN'),
128 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
129 config.get('ARVADOS_KEEP_PROXY'),
130 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
131 os.environ.get('KEEP_LOCAL_STORE'))
132 if (global_client_object is None) or (cls._last_key != key):
133 global_client_object = KeepClient()
135 return global_client_object
138 def get(locator, **kwargs):
139 return Keep.global_client_object().get(locator, **kwargs)
142 def put(data, **kwargs):
143 return Keep.global_client_object().put(data, **kwargs)
145 class KeepBlockCache(object):
146 # Default RAM cache is 256MiB
147 def __init__(self, cache_max=(256 * 1024 * 1024)):
148 self.cache_max = cache_max
150 self._cache_lock = threading.Lock()
152 class CacheSlot(object):
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 with self._cache_lock:
175 # Select all slots except those where ready.is_set() and content is
176 # None (that means there was an error reading the block).
177 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178 sm = sum([slot.size() for slot in self._cache])
179 while len(self._cache) > 0 and sm > self.cache_max:
180 for i in xrange(len(self._cache)-1, -1, -1):
181 if self._cache[i].ready.is_set():
184 sm = sum([slot.size() for slot in self._cache])
186 def reserve_cache(self, locator):
187 '''Reserve a cache slot for the specified locator,
188 or return the existing slot.'''
189 with self._cache_lock:
190 # Test if the locator is already in the cache
191 for i in xrange(0, len(self._cache)):
192 if self._cache[i].locator == locator:
195 # move it to the front
197 self._cache.insert(0, n)
200 # Add a new cache slot for the locator
201 n = KeepBlockCache.CacheSlot(locator)
202 self._cache.insert(0, n)
205 class KeepClient(object):
207 # Default Keep server connection timeout: 2 seconds
208 # Default Keep server read timeout: 300 seconds
209 # Default Keep proxy connection timeout: 20 seconds
210 # Default Keep proxy read timeout: 300 seconds
211 DEFAULT_TIMEOUT = (2, 300)
212 DEFAULT_PROXY_TIMEOUT = (20, 300)
214 class ThreadLimiter(object):
216 Limit the number of threads running at a given time to
217 {desired successes} minus {successes reported}. When successes
218 reported == desired, wake up the remaining threads and tell
221 Should be used in a "with" block.
223 def __init__(self, todo):
226 self._response = None
227 self._todo_lock = threading.Semaphore(todo)
228 self._done_lock = threading.Lock()
231 self._todo_lock.acquire()
234 def __exit__(self, type, value, traceback):
235 self._todo_lock.release()
237 def shall_i_proceed(self):
239 Return true if the current thread should do stuff. Return
240 false if the current thread should just stop.
242 with self._done_lock:
243 return (self._done < self._todo)
245 def save_response(self, response_body, replicas_stored):
247 Records a response body (a locator, possibly signed) returned by
248 the Keep server. It is not necessary to save more than
249 one response, since we presume that any locator returned
250 in response to a successful request is valid.
252 with self._done_lock:
253 self._done += replicas_stored
254 self._response = response_body
258 Returns the body from the response to a PUT request.
260 with self._done_lock:
261 return self._response
265 Return how many successes were reported.
267 with self._done_lock:
271 class KeepService(object):
272 # Make requests to a single Keep service, and track results.
273 HTTP_ERRORS = (requests.exceptions.RequestException,
274 socket.error, ssl.SSLError)
276 def __init__(self, root, **headers):
278 self.last_result = None
279 self.success_flag = None
280 self.get_headers = {'Accept': 'application/octet-stream'}
281 self.get_headers.update(headers)
282 self.put_headers = headers
285 return self.success_flag is not False
288 return self.success_flag is not None
290 def last_status(self):
292 return self.last_result.status_code
293 except AttributeError:
296 def get(self, locator, timeout=None):
297 # locator is a KeepLocator object.
298 url = self.root + str(locator)
299 _logger.debug("Request: GET %s", url)
301 with timer.Timer() as t:
302 result = requests.get(url.encode('utf-8'),
303 headers=self.get_headers,
305 except self.HTTP_ERRORS as e:
306 _logger.debug("Request fail: GET %s => %s: %s",
307 url, type(e), str(e))
310 self.last_result = result
311 self.success_flag = retry.check_http_response_success(result)
312 content = result.content
313 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
314 self.last_status(), len(content), t.msecs,
315 (len(content)/(1024.0*1024))/t.secs)
316 if self.success_flag:
317 resp_md5 = hashlib.md5(content).hexdigest()
318 if resp_md5 == locator.md5sum:
320 _logger.warning("Checksum fail: md5(%s) = %s",
324 def put(self, hash_s, body, timeout=None):
325 url = self.root + hash_s
326 _logger.debug("Request: PUT %s", url)
328 result = requests.put(url.encode('utf-8'),
330 headers=self.put_headers,
332 except self.HTTP_ERRORS as e:
333 _logger.debug("Request fail: PUT %s => %s: %s",
334 url, type(e), str(e))
337 self.last_result = result
338 self.success_flag = retry.check_http_response_success(result)
339 return self.success_flag
342 class KeepWriterThread(threading.Thread):
344 Write a blob of data to the given Keep server. On success, call
345 save_response() of the given ThreadLimiter to save the returned
348 def __init__(self, keep_service, **kwargs):
349 super(KeepClient.KeepWriterThread, self).__init__()
350 self.service = keep_service
352 self._success = False
358 with self.args['thread_limiter'] as limiter:
359 if not limiter.shall_i_proceed():
360 # My turn arrived, but the job has been done without
363 self.run_with_limiter(limiter)
365 def run_with_limiter(self, limiter):
366 if self.service.finished():
368 _logger.debug("KeepWriterThread %s proceeding %s %s",
369 str(threading.current_thread()),
370 self.args['data_hash'],
371 self.args['service_root'])
372 self._success = bool(self.service.put(
373 self.args['data_hash'],
375 timeout=self.args.get('timeout', None)))
376 status = self.service.last_status()
378 result = self.service.last_result
379 _logger.debug("KeepWriterThread %s succeeded %s %s",
380 str(threading.current_thread()),
381 self.args['data_hash'],
382 self.args['service_root'])
383 # Tick the 'done' counter for the number of replica
384 # reported stored by the server, for the case that
385 # we're talking to a proxy or other backend that
386 # stores to multiple copies for us.
388 replicas_stored = int(result.headers['x-keep-replicas-stored'])
389 except (KeyError, ValueError):
391 limiter.save_response(result.text.strip(), replicas_stored)
392 elif status is not None:
393 _logger.debug("Request fail: PUT %s => %s %s",
394 self.args['data_hash'], status,
395 self.service.last_result.text)
398 def __init__(self, api_client=None, proxy=None,
399 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
400 api_token=None, local_store=None, block_cache=None,
402 """Initialize a new KeepClient.
405 * api_client: The API client to use to find Keep services. If not
406 provided, KeepClient will build one from available Arvados
408 * proxy: If specified, this KeepClient will send requests to this
409 Keep proxy. Otherwise, KeepClient will fall back to the setting
410 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
411 ensure KeepClient does not use a proxy, pass in an empty string.
412 * timeout: The timeout (in seconds) for HTTP requests to Keep
413 non-proxy servers. A tuple of two floats is interpreted as
414 (connection_timeout, read_timeout): see
415 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
417 * proxy_timeout: The timeout (in seconds) for HTTP requests to
418 Keep proxies. A tuple of two floats is interpreted as
419 (connection_timeout, read_timeout). Default: (20, 300).
420 * api_token: If you're not using an API client, but only talking
421 directly to a Keep proxy, this parameter specifies an API token
422 to authenticate Keep requests. It is an error to specify both
423 api_client and api_token. If you specify neither, KeepClient
424 will use one available from the Arvados configuration.
425 * local_store: If specified, this KeepClient will bypass Keep
426 services, and save data to the named directory. If unspecified,
427 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
428 environment variable. If you want to ensure KeepClient does not
429 use local storage, pass in an empty string. This is primarily
430 intended to mock a server for testing.
431 * num_retries: The default number of times to retry failed requests.
432 This will be used as the default num_retries value when get() and
433 put() are called. Default 0.
435 self.lock = threading.Lock()
437 proxy = config.get('ARVADOS_KEEP_PROXY')
438 if api_token is None:
439 if api_client is None:
440 api_token = config.get('ARVADOS_API_TOKEN')
442 api_token = api_client.api_token
443 elif api_client is not None:
445 "can't build KeepClient with both API client and token")
446 if local_store is None:
447 local_store = os.environ.get('KEEP_LOCAL_STORE')
449 self.block_cache = block_cache if block_cache else KeepBlockCache()
450 self.timeout = timeout
451 self.proxy_timeout = proxy_timeout
454 self.local_store = local_store
455 self.get = self.local_store_get
456 self.put = self.local_store_put
458 self.num_retries = num_retries
460 if not proxy.endswith('/'):
462 self.api_token = api_token
463 self._keep_services = [{
465 '_service_root': proxy,
467 self.using_proxy = True
468 self._static_services_list = True
470 # It's important to avoid instantiating an API client
471 # unless we actually need one, for testing's sake.
472 if api_client is None:
473 api_client = arvados.api('v1')
474 self.api_client = api_client
475 self.api_token = api_client.api_token
476 self._keep_services = None
477 self.using_proxy = None
478 self._static_services_list = False
480 def current_timeout(self):
481 """Return the appropriate timeout to use for this client: the proxy
482 timeout setting if the backend service is currently a proxy,
483 the regular timeout setting otherwise.
485 # TODO(twp): the timeout should be a property of a
486 # KeepService, not a KeepClient. See #4488.
487 return self.proxy_timeout if self.using_proxy else self.timeout
489 def build_services_list(self, force_rebuild=False):
490 if (self._static_services_list or
491 (self._keep_services and not force_rebuild)):
495 keep_services = self.api_client.keep_services().accessible()
496 except Exception: # API server predates Keep services.
497 keep_services = self.api_client.keep_disks().list()
499 self._keep_services = keep_services.execute().get('items')
500 if not self._keep_services:
501 raise arvados.errors.NoKeepServersError()
503 self.using_proxy = any(ks.get('service_type') == 'proxy'
504 for ks in self._keep_services)
506 # Precompute the base URI for each service.
507 for r in self._keep_services:
508 r['_service_root'] = "{}://[{}]:{:d}/".format(
509 'https' if r['service_ssl_flag'] else 'http',
512 _logger.debug(str(self._keep_services))
514 def _service_weight(self, data_hash, service_uuid):
515 """Compute the weight of a Keep service endpoint for a data
516 block with a known hash.
518 The weight is md5(h + u) where u is the last 15 characters of
519 the service endpoint's UUID.
521 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
523 def weighted_service_roots(self, data_hash, force_rebuild=False):
524 """Return an array of Keep service endpoints, in the order in
525 which they should be probed when reading or writing data with
528 self.build_services_list(force_rebuild)
530 # Sort the available services by weight (heaviest first) for
531 # this data_hash, and return their service_roots (base URIs)
534 svc['_service_root'] for svc in sorted(
537 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
538 _logger.debug(data_hash + ': ' + str(sorted_roots))
541 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
542 # roots_map is a dictionary, mapping Keep service root strings
543 # to KeepService objects. Poll for Keep services, and add any
544 # new ones to roots_map. Return the current list of local
546 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
547 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
548 for root in local_roots:
549 if root not in roots_map:
550 roots_map[root] = self.KeepService(root, **headers)
554 def _check_loop_result(result):
555 # KeepClient RetryLoops should save results as a 2-tuple: the
556 # actual result of the request, and the number of servers available
557 # to receive the request this round.
558 # This method returns True if there's a real result, False if
559 # there are no more servers available, otherwise None.
560 if isinstance(result, Exception):
562 result, tried_server_count = result
563 if (result is not None) and (result is not False):
565 elif tried_server_count < 1:
566 _logger.info("No more Keep services to try; giving up")
572 def get(self, loc_s, num_retries=None):
573 """Get data from Keep.
575 This method fetches one or more blocks of data from Keep. It
576 sends a request each Keep service registered with the API
577 server (or the proxy provided when this client was
578 instantiated), then each service named in location hints, in
579 sequence. As soon as one service provides the data, it's
583 * loc_s: A string of one or more comma-separated locators to fetch.
584 This method returns the concatenation of these blocks.
585 * num_retries: The number of times to retry GET requests to
586 *each* Keep server if it returns temporary failures, with
587 exponential backoff. Note that, in each loop, the method may try
588 to fetch data from every available Keep service, along with any
589 that are named in location hints in the locator. The default value
590 is set when the KeepClient is initialized.
593 return ''.join(self.get(x) for x in loc_s.split(','))
594 locator = KeepLocator(loc_s)
595 expect_hash = locator.md5sum
597 slot, first = self.block_cache.reserve_cache(expect_hash)
602 # See #3147 for a discussion of the loop implementation. Highlights:
603 # * Refresh the list of Keep services after each failure, in case
604 # it's being updated.
605 # * Retry until we succeed, we're out of retries, or every available
606 # service has returned permanent failure.
607 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
608 for hint in locator.hints if hint.startswith('K@')]
609 # Map root URLs their KeepService objects.
610 roots_map = {root: self.KeepService(root) for root in hint_roots}
612 loop = retry.RetryLoop(num_retries, self._check_loop_result,
614 for tries_left in loop:
616 local_roots = self.map_new_services(
617 roots_map, expect_hash,
618 force_rebuild=(tries_left < num_retries))
619 except Exception as error:
620 loop.save_result(error)
623 # Query KeepService objects that haven't returned
624 # permanent failure, in our specified shuffle order.
625 services_to_try = [roots_map[root]
626 for root in (local_roots + hint_roots)
627 if roots_map[root].usable()]
628 for keep_service in services_to_try:
629 blob = keep_service.get(locator, timeout=self.current_timeout())
632 loop.save_result((blob, len(services_to_try)))
634 # Always cache the result, then return it if we succeeded.
636 self.block_cache.cap_cache()
640 # No servers fulfilled the request. Count how many responded
641 # "not found;" if the ratio is high enough (currently 75%), report
642 # Not Found; otherwise a generic error.
643 # Q: Including 403 is necessary for the Keep tests to continue
644 # passing, but maybe they should expect KeepReadError instead?
645 not_founds = sum(1 for ks in roots_map.values()
646 if ks.last_status() in set([403, 404, 410]))
647 if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
648 raise arvados.errors.NotFoundError(loc_s)
650 raise arvados.errors.KeepReadError(loc_s)
653 def put(self, data, copies=2, num_retries=None):
654 """Save data in Keep.
656 This method will get a list of Keep services from the API server, and
657 send the data to each one simultaneously in a new thread. Once the
658 uploads are finished, if enough copies are saved, this method returns
659 the most recent HTTP response body. If requests fail to upload
660 enough copies, this method raises KeepWriteError.
663 * data: The string of data to upload.
664 * copies: The number of copies that the user requires be saved.
666 * num_retries: The number of times to retry PUT requests to
667 *each* Keep server if it returns temporary failures, with
668 exponential backoff. The default value is set when the
669 KeepClient is initialized.
671 data_hash = hashlib.md5(data).hexdigest()
677 # Tell the proxy how many copies we want it to store
678 headers['X-Keep-Desired-Replication'] = str(copies)
680 thread_limiter = KeepClient.ThreadLimiter(copies)
681 loop = retry.RetryLoop(num_retries, self._check_loop_result,
683 for tries_left in loop:
685 local_roots = self.map_new_services(
686 roots_map, data_hash,
687 force_rebuild=(tries_left < num_retries), **headers)
688 except Exception as error:
689 loop.save_result(error)
693 for service_root, ks in roots_map.iteritems():
696 t = KeepClient.KeepWriterThread(
700 service_root=service_root,
701 thread_limiter=thread_limiter,
702 timeout=self.current_timeout())
707 loop.save_result((thread_limiter.done() >= copies, len(threads)))
710 return thread_limiter.response()
711 raise arvados.errors.KeepWriteError(
712 "Write fail for %s: wanted %d but wrote %d" %
713 (data_hash, copies, thread_limiter.done()))
715 # Local storage methods need no-op num_retries arguments to keep
716 # integration tests happy. With better isolation they could
717 # probably be removed again.
718 def local_store_put(self, data, num_retries=0):
719 md5 = hashlib.md5(data).hexdigest()
720 locator = '%s+%d' % (md5, len(data))
721 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
723 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
724 os.path.join(self.local_store, md5))
727 def local_store_get(self, loc_s, num_retries=0):
729 locator = KeepLocator(loc_s)
731 raise arvados.errors.NotFoundError(
732 "Invalid data locator: '%s'" % loc_s)
733 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
735 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: