25 import arvados.config as config
27 import arvados.retry as retry
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
62 return "%s+%i" % (self.md5sum, self.size)
64 def _make_hex_prop(name, length):
65 # Build and return a new property with the given name that
66 # must be a hex string of the given length.
67 data_name = '_{}'.format(name)
69 return getattr(self, data_name)
70 def setter(self, hex_str):
71 if not arvados.util.is_hex(hex_str, length):
72 raise ValueError("{} must be a {}-digit hex string: {}".
73 format(name, length, hex_str))
74 setattr(self, data_name, hex_str)
75 return property(getter, setter)
77 md5sum = _make_hex_prop('md5sum', 32)
78 perm_sig = _make_hex_prop('perm_sig', 40)
81 def perm_expiry(self):
82 return self._perm_expiry
85 def perm_expiry(self, value):
86 if not arvados.util.is_hex(value, 1, 8):
88 "permission timestamp must be a hex Unix timestamp: {}".
90 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
92 def permission_hint(self):
93 data = [self.perm_sig, self.perm_expiry]
96 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
97 return "A{}@{:08x}".format(*data)
99 def parse_permission_hint(self, s):
101 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
103 raise ValueError("bad permission hint {}".format(s))
105 def permission_expired(self, as_of_dt=None):
106 if self.perm_expiry is None:
108 elif as_of_dt is None:
109 as_of_dt = datetime.datetime.now()
110 return self.perm_expiry <= as_of_dt
114 """Simple interface to a global KeepClient object.
116 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
117 own API client. The global KeepClient will build an API client from the
118 current Arvados configuration, which may not match the one you built.
123 def global_client_object(cls):
124 global global_client_object
125 # Previously, KeepClient would change its behavior at runtime based
126 # on these configuration settings. We simulate that behavior here
127 # by checking the values and returning a new KeepClient if any of
129 key = (config.get('ARVADOS_API_HOST'),
130 config.get('ARVADOS_API_TOKEN'),
131 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
132 config.get('ARVADOS_KEEP_PROXY'),
133 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
134 os.environ.get('KEEP_LOCAL_STORE'))
135 if (global_client_object is None) or (cls._last_key != key):
136 global_client_object = KeepClient()
138 return global_client_object
141 def get(locator, **kwargs):
142 return Keep.global_client_object().get(locator, **kwargs)
145 def put(data, **kwargs):
146 return Keep.global_client_object().put(data, **kwargs)
148 class KeepBlockCache(object):
149 # Default RAM cache is 256MiB
150 def __init__(self, cache_max=(256 * 1024 * 1024)):
151 self.cache_max = cache_max
153 self._cache_lock = threading.Lock()
155 class CacheSlot(object):
156 def __init__(self, locator):
157 self.locator = locator
158 self.ready = threading.Event()
165 def set(self, value):
170 if self.content is None:
173 return len(self.content)
176 '''Cap the cache size to self.cache_max'''
177 with self._cache_lock:
178 # Select all slots except those where ready.is_set() and content is
179 # None (that means there was an error reading the block).
180 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
181 sm = sum([slot.size() for slot in self._cache])
182 while len(self._cache) > 0 and sm > self.cache_max:
183 for i in xrange(len(self._cache)-1, -1, -1):
184 if self._cache[i].ready.is_set():
187 sm = sum([slot.size() for slot in self._cache])
189 def _get(self, locator):
190 # Test if the locator is already in the cache
191 for i in xrange(0, len(self._cache)):
192 if self._cache[i].locator == locator:
195 # move it to the front
197 self._cache.insert(0, n)
201 def get(self, locator):
202 with self._cache_lock:
203 return self._get(locator)
205 def reserve_cache(self, locator):
206 '''Reserve a cache slot for the specified locator,
207 or return the existing slot.'''
208 with self._cache_lock:
209 n = self._get(locator)
213 # Add a new cache slot for the locator
214 n = KeepBlockCache.CacheSlot(locator)
215 self._cache.insert(0, n)
218 class KeepClient(object):
220 # Default Keep server connection timeout: 2 seconds
221 # Default Keep server read timeout: 300 seconds
222 # Default Keep proxy connection timeout: 20 seconds
223 # Default Keep proxy read timeout: 300 seconds
224 DEFAULT_TIMEOUT = (2, 300)
225 DEFAULT_PROXY_TIMEOUT = (20, 300)
227 class ThreadLimiter(object):
229 Limit the number of threads running at a given time to
230 {desired successes} minus {successes reported}. When successes
231 reported == desired, wake up the remaining threads and tell
234 Should be used in a "with" block.
236 def __init__(self, todo):
239 self._response = None
240 self._todo_lock = threading.Semaphore(todo)
241 self._done_lock = threading.Lock()
244 self._todo_lock.acquire()
247 def __exit__(self, type, value, traceback):
248 self._todo_lock.release()
250 def shall_i_proceed(self):
252 Return true if the current thread should do stuff. Return
253 false if the current thread should just stop.
255 with self._done_lock:
256 return (self._done < self._todo)
258 def save_response(self, response_body, replicas_stored):
260 Records a response body (a locator, possibly signed) returned by
261 the Keep server. It is not necessary to save more than
262 one response, since we presume that any locator returned
263 in response to a successful request is valid.
265 with self._done_lock:
266 self._done += replicas_stored
267 self._response = response_body
271 Returns the body from the response to a PUT request.
273 with self._done_lock:
274 return self._response
278 Return how many successes were reported.
280 with self._done_lock:
284 class KeepService(object):
285 # Make requests to a single Keep service, and track results.
286 HTTP_ERRORS = (requests.exceptions.RequestException,
287 socket.error, ssl.SSLError)
289 def __init__(self, root, session, **headers):
291 self.last_result = None
292 self.success_flag = None
293 self.session = session
294 self.get_headers = {'Accept': 'application/octet-stream'}
295 self.get_headers.update(headers)
296 self.put_headers = headers
299 return self.success_flag is not False
302 return self.success_flag is not None
304 def last_status(self):
306 return self.last_result.status_code
307 except AttributeError:
310 def get(self, locator, timeout=None):
311 # locator is a KeepLocator object.
312 url = self.root + str(locator)
313 _logger.debug("Request: GET %s", url)
315 with timer.Timer() as t:
316 result = self.session.get(url.encode('utf-8'),
317 headers=self.get_headers,
319 except self.HTTP_ERRORS as e:
320 _logger.debug("Request fail: GET %s => %s: %s",
321 url, type(e), str(e))
324 self.last_result = result
325 self.success_flag = retry.check_http_response_success(result)
326 content = result.content
327 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
328 self.last_status(), len(content), t.msecs,
329 (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
330 if self.success_flag:
331 resp_md5 = hashlib.md5(content).hexdigest()
332 if resp_md5 == locator.md5sum:
334 _logger.warning("Checksum fail: md5(%s) = %s",
338 def put(self, hash_s, body, timeout=None):
339 url = self.root + hash_s
340 _logger.debug("Request: PUT %s", url)
342 result = self.session.put(url.encode('utf-8'),
344 headers=self.put_headers,
346 except self.HTTP_ERRORS as e:
347 _logger.debug("Request fail: PUT %s => %s: %s",
348 url, type(e), str(e))
351 self.last_result = result
352 self.success_flag = retry.check_http_response_success(result)
353 return self.success_flag
356 class KeepWriterThread(threading.Thread):
358 Write a blob of data to the given Keep server. On success, call
359 save_response() of the given ThreadLimiter to save the returned
362 def __init__(self, keep_service, **kwargs):
363 super(KeepClient.KeepWriterThread, self).__init__()
364 self.service = keep_service
366 self._success = False
372 with self.args['thread_limiter'] as limiter:
373 if not limiter.shall_i_proceed():
374 # My turn arrived, but the job has been done without
377 self.run_with_limiter(limiter)
379 def run_with_limiter(self, limiter):
380 if self.service.finished():
382 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
383 str(threading.current_thread()),
384 self.args['data_hash'],
385 len(self.args['data']),
386 self.args['service_root'])
387 self._success = bool(self.service.put(
388 self.args['data_hash'],
390 timeout=self.args.get('timeout', None)))
391 status = self.service.last_status()
393 result = self.service.last_result
394 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
395 str(threading.current_thread()),
396 self.args['data_hash'],
397 len(self.args['data']),
398 self.args['service_root'])
399 # Tick the 'done' counter for the number of replica
400 # reported stored by the server, for the case that
401 # we're talking to a proxy or other backend that
402 # stores to multiple copies for us.
404 replicas_stored = int(result.headers['x-keep-replicas-stored'])
405 except (KeyError, ValueError):
407 limiter.save_response(result.text.strip(), replicas_stored)
408 elif status is not None:
409 _logger.debug("Request fail: PUT %s => %s %s",
410 self.args['data_hash'], status,
411 self.service.last_result.text)
414 def __init__(self, api_client=None, proxy=None,
415 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
416 api_token=None, local_store=None, block_cache=None,
417 num_retries=0, session=None):
418 """Initialize a new KeepClient.
422 The API client to use to find Keep services. If not
423 provided, KeepClient will build one from available Arvados
427 If specified, this KeepClient will send requests to this Keep
428 proxy. Otherwise, KeepClient will fall back to the setting of the
429 ARVADOS_KEEP_PROXY configuration setting. If you want to ensure
430 KeepClient does not use a proxy, pass in an empty string.
433 The timeout (in seconds) for HTTP requests to Keep
434 non-proxy servers. A tuple of two floats is interpreted as
435 (connection_timeout, read_timeout): see
436 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
440 The timeout (in seconds) for HTTP requests to
441 Keep proxies. A tuple of two floats is interpreted as
442 (connection_timeout, read_timeout). Default: (20, 300).
445 If you're not using an API client, but only talking
446 directly to a Keep proxy, this parameter specifies an API token
447 to authenticate Keep requests. It is an error to specify both
448 api_client and api_token. If you specify neither, KeepClient
449 will use one available from the Arvados configuration.
452 If specified, this KeepClient will bypass Keep
453 services, and save data to the named directory. If unspecified,
454 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
455 environment variable. If you want to ensure KeepClient does not
456 use local storage, pass in an empty string. This is primarily
457 intended to mock a server for testing.
460 The default number of times to retry failed requests.
461 This will be used as the default num_retries value when get() and
462 put() are called. Default 0.
465 The requests.Session object to use for get() and put() requests.
466 Will create one if not specified.
468 self.lock = threading.Lock()
470 proxy = config.get('ARVADOS_KEEP_PROXY')
471 if api_token is None:
472 if api_client is None:
473 api_token = config.get('ARVADOS_API_TOKEN')
475 api_token = api_client.api_token
476 elif api_client is not None:
478 "can't build KeepClient with both API client and token")
479 if local_store is None:
480 local_store = os.environ.get('KEEP_LOCAL_STORE')
482 self.block_cache = block_cache if block_cache else KeepBlockCache()
483 self.timeout = timeout
484 self.proxy_timeout = proxy_timeout
487 self.local_store = local_store
488 self.get = self.local_store_get
489 self.put = self.local_store_put
491 self.num_retries = num_retries
492 self.session = session if session is not None else requests.Session()
494 if not proxy.endswith('/'):
496 self.api_token = api_token
497 self._keep_services = [{
499 '_service_root': proxy,
501 self.using_proxy = True
502 self._static_services_list = True
504 # It's important to avoid instantiating an API client
505 # unless we actually need one, for testing's sake.
506 if api_client is None:
507 api_client = arvados.api('v1')
508 self.api_client = api_client
509 self.api_token = api_client.api_token
510 self._keep_services = None
511 self.using_proxy = None
512 self._static_services_list = False
514 def current_timeout(self):
515 """Return the appropriate timeout to use for this client: the proxy
516 timeout setting if the backend service is currently a proxy,
517 the regular timeout setting otherwise.
519 # TODO(twp): the timeout should be a property of a
520 # KeepService, not a KeepClient. See #4488.
521 return self.proxy_timeout if self.using_proxy else self.timeout
523 def build_services_list(self, force_rebuild=False):
524 if (self._static_services_list or
525 (self._keep_services and not force_rebuild)):
529 keep_services = self.api_client.keep_services().accessible()
530 except Exception: # API server predates Keep services.
531 keep_services = self.api_client.keep_disks().list()
533 self._keep_services = keep_services.execute().get('items')
534 if not self._keep_services:
535 raise arvados.errors.NoKeepServersError()
537 self.using_proxy = any(ks.get('service_type') == 'proxy'
538 for ks in self._keep_services)
540 # Precompute the base URI for each service.
541 for r in self._keep_services:
542 r['_service_root'] = "{}://[{}]:{:d}/".format(
543 'https' if r['service_ssl_flag'] else 'http',
546 _logger.debug(str(self._keep_services))
548 def _service_weight(self, data_hash, service_uuid):
549 """Compute the weight of a Keep service endpoint for a data
550 block with a known hash.
552 The weight is md5(h + u) where u is the last 15 characters of
553 the service endpoint's UUID.
555 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
557 def weighted_service_roots(self, data_hash, force_rebuild=False):
558 """Return an array of Keep service endpoints, in the order in
559 which they should be probed when reading or writing data with
562 self.build_services_list(force_rebuild)
564 # Sort the available services by weight (heaviest first) for
565 # this data_hash, and return their service_roots (base URIs)
568 svc['_service_root'] for svc in sorted(
571 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
572 _logger.debug(data_hash + ': ' + str(sorted_roots))
575 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
576 # roots_map is a dictionary, mapping Keep service root strings
577 # to KeepService objects. Poll for Keep services, and add any
578 # new ones to roots_map. Return the current list of local
580 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
581 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
582 for root in local_roots:
583 if root not in roots_map:
584 roots_map[root] = self.KeepService(root, self.session, **headers)
588 def _check_loop_result(result):
589 # KeepClient RetryLoops should save results as a 2-tuple: the
590 # actual result of the request, and the number of servers available
591 # to receive the request this round.
592 # This method returns True if there's a real result, False if
593 # there are no more servers available, otherwise None.
594 if isinstance(result, Exception):
596 result, tried_server_count = result
597 if (result is not None) and (result is not False):
599 elif tried_server_count < 1:
600 _logger.info("No more Keep services to try; giving up")
605 def get_from_cache(self, loc):
606 """Fetch a block only if is in the cache, otherwise return None."""
607 slot = self.block_cache.get(loc)
608 if slot.ready.is_set():
614 def get(self, loc_s, num_retries=None):
615 """Get data from Keep.
617 This method fetches one or more blocks of data from Keep. It
618 sends a request each Keep service registered with the API
619 server (or the proxy provided when this client was
620 instantiated), then each service named in location hints, in
621 sequence. As soon as one service provides the data, it's
625 * loc_s: A string of one or more comma-separated locators to fetch.
626 This method returns the concatenation of these blocks.
627 * num_retries: The number of times to retry GET requests to
628 *each* Keep server if it returns temporary failures, with
629 exponential backoff. Note that, in each loop, the method may try
630 to fetch data from every available Keep service, along with any
631 that are named in location hints in the locator. The default value
632 is set when the KeepClient is initialized.
635 return ''.join(self.get(x) for x in loc_s.split(','))
636 locator = KeepLocator(loc_s)
637 expect_hash = locator.md5sum
638 slot, first = self.block_cache.reserve_cache(expect_hash)
643 # See #3147 for a discussion of the loop implementation. Highlights:
644 # * Refresh the list of Keep services after each failure, in case
645 # it's being updated.
646 # * Retry until we succeed, we're out of retries, or every available
647 # service has returned permanent failure.
648 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
649 for hint in locator.hints if hint.startswith('K@')]
650 # Map root URLs their KeepService objects.
651 roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
653 loop = retry.RetryLoop(num_retries, self._check_loop_result,
655 for tries_left in loop:
657 local_roots = self.map_new_services(
658 roots_map, expect_hash,
659 force_rebuild=(tries_left < num_retries))
660 except Exception as error:
661 loop.save_result(error)
664 # Query KeepService objects that haven't returned
665 # permanent failure, in our specified shuffle order.
666 services_to_try = [roots_map[root]
667 for root in (local_roots + hint_roots)
668 if roots_map[root].usable()]
669 for keep_service in services_to_try:
670 blob = keep_service.get(locator, timeout=self.current_timeout())
673 loop.save_result((blob, len(services_to_try)))
675 # Always cache the result, then return it if we succeeded.
677 self.block_cache.cap_cache()
682 all_roots = local_roots + hint_roots
684 # We never successfully fetched local_roots.
685 all_roots = hint_roots
686 # Q: Including 403 is necessary for the Keep tests to continue
687 # passing, but maybe they should expect KeepReadError instead?
688 not_founds = sum(1 for key in all_roots
689 if roots_map[key].last_status() in {403, 404, 410})
690 service_errors = ((key, roots_map[key].last_result)
691 for key in all_roots)
693 raise arvados.errors.KeepReadError(
694 "failed to read {}: no Keep services available ({})".format(
695 loc_s, loop.last_result()))
696 elif not_founds == len(all_roots):
697 raise arvados.errors.NotFoundError(
698 "{} not found".format(loc_s), service_errors)
700 raise arvados.errors.KeepReadError(
701 "failed to read {}".format(loc_s), service_errors)
704 def put(self, data, copies=2, num_retries=None):
705 """Save data in Keep.
707 This method will get a list of Keep services from the API server, and
708 send the data to each one simultaneously in a new thread. Once the
709 uploads are finished, if enough copies are saved, this method returns
710 the most recent HTTP response body. If requests fail to upload
711 enough copies, this method raises KeepWriteError.
714 * data: The string of data to upload.
715 * copies: The number of copies that the user requires be saved.
717 * num_retries: The number of times to retry PUT requests to
718 *each* Keep server if it returns temporary failures, with
719 exponential backoff. The default value is set when the
720 KeepClient is initialized.
722 data_hash = hashlib.md5(data).hexdigest()
728 # Tell the proxy how many copies we want it to store
729 headers['X-Keep-Desired-Replication'] = str(copies)
731 thread_limiter = KeepClient.ThreadLimiter(copies)
732 loop = retry.RetryLoop(num_retries, self._check_loop_result,
734 for tries_left in loop:
736 local_roots = self.map_new_services(
737 roots_map, data_hash,
738 force_rebuild=(tries_left < num_retries), **headers)
739 except Exception as error:
740 loop.save_result(error)
744 for service_root, ks in roots_map.iteritems():
747 t = KeepClient.KeepWriterThread(
751 service_root=service_root,
752 thread_limiter=thread_limiter,
753 timeout=self.current_timeout())
758 loop.save_result((thread_limiter.done() >= copies, len(threads)))
761 return thread_limiter.response()
763 raise arvados.errors.KeepWriteError(
764 "failed to write {}: no Keep services available ({})".format(
765 data_hash, loop.last_result()))
767 service_errors = ((key, roots_map[key].last_result)
768 for key in local_roots
769 if not roots_map[key].success_flag)
770 raise arvados.errors.KeepWriteError(
771 "failed to write {} (wanted {} copies but wrote {})".format(
772 data_hash, copies, thread_limiter.done()), service_errors)
774 def local_store_put(self, data, copies=1, num_retries=None):
777 This method is used in place of the real put() method when
778 using local storage (see constructor's local_store argument).
780 copies and num_retries arguments are ignored: they are here
781 only for the sake of offering the same call signature as
784 Data stored this way can be retrieved via local_store_get().
786 md5 = hashlib.md5(data).hexdigest()
787 locator = '%s+%d' % (md5, len(data))
788 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
790 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
791 os.path.join(self.local_store, md5))
794 def local_store_get(self, loc_s, num_retries=None):
795 """Companion to local_store_put()."""
797 locator = KeepLocator(loc_s)
799 raise arvados.errors.NotFoundError(
800 "Invalid data locator: '%s'" % loc_s)
801 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
803 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
806 def is_cached(self, locator):
807 return self.block_cache.reserve_cache(expect_hash)