25 import arvados.config as config
27 import arvados.retry as retry
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
62 return "%s+%i" % (self.md5sum, self.size)
64 def _make_hex_prop(name, length):
65 # Build and return a new property with the given name that
66 # must be a hex string of the given length.
67 data_name = '_{}'.format(name)
69 return getattr(self, data_name)
70 def setter(self, hex_str):
71 if not arvados.util.is_hex(hex_str, length):
72 raise ValueError("{} must be a {}-digit hex string: {}".
73 format(name, length, hex_str))
74 setattr(self, data_name, hex_str)
75 return property(getter, setter)
77 md5sum = _make_hex_prop('md5sum', 32)
78 perm_sig = _make_hex_prop('perm_sig', 40)
81 def perm_expiry(self):
82 return self._perm_expiry
85 def perm_expiry(self, value):
86 if not arvados.util.is_hex(value, 1, 8):
88 "permission timestamp must be a hex Unix timestamp: {}".
90 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
92 def permission_hint(self):
93 data = [self.perm_sig, self.perm_expiry]
96 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
97 return "A{}@{:08x}".format(*data)
99 def parse_permission_hint(self, s):
101 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
103 raise ValueError("bad permission hint {}".format(s))
105 def permission_expired(self, as_of_dt=None):
106 if self.perm_expiry is None:
108 elif as_of_dt is None:
109 as_of_dt = datetime.datetime.now()
110 return self.perm_expiry <= as_of_dt
114 """Simple interface to a global KeepClient object.
116 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
117 own API client. The global KeepClient will build an API client from the
118 current Arvados configuration, which may not match the one you built.
123 def global_client_object(cls):
124 global global_client_object
125 # Previously, KeepClient would change its behavior at runtime based
126 # on these configuration settings. We simulate that behavior here
127 # by checking the values and returning a new KeepClient if any of
129 key = (config.get('ARVADOS_API_HOST'),
130 config.get('ARVADOS_API_TOKEN'),
131 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
132 config.get('ARVADOS_KEEP_PROXY'),
133 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
134 os.environ.get('KEEP_LOCAL_STORE'))
135 if (global_client_object is None) or (cls._last_key != key):
136 global_client_object = KeepClient()
138 return global_client_object
141 def get(locator, **kwargs):
142 return Keep.global_client_object().get(locator, **kwargs)
145 def put(data, **kwargs):
146 return Keep.global_client_object().put(data, **kwargs)
148 class KeepBlockCache(object):
149 # Default RAM cache is 256MiB
150 def __init__(self, cache_max=(256 * 1024 * 1024)):
151 self.cache_max = cache_max
153 self._cache_lock = threading.Lock()
155 class CacheSlot(object):
156 def __init__(self, locator):
157 self.locator = locator
158 self.ready = threading.Event()
165 def set(self, value):
170 if self.content is None:
173 return len(self.content)
176 '''Cap the cache size to self.cache_max'''
177 with self._cache_lock:
178 # Select all slots except those where ready.is_set() and content is
179 # None (that means there was an error reading the block).
180 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
181 sm = sum([slot.size() for slot in self._cache])
182 while len(self._cache) > 0 and sm > self.cache_max:
183 for i in xrange(len(self._cache)-1, -1, -1):
184 if self._cache[i].ready.is_set():
187 sm = sum([slot.size() for slot in self._cache])
189 def reserve_cache(self, locator):
190 '''Reserve a cache slot for the specified locator,
191 or return the existing slot.'''
192 with self._cache_lock:
193 # Test if the locator is already in the cache
194 for i in xrange(0, len(self._cache)):
195 if self._cache[i].locator == locator:
198 # move it to the front
200 self._cache.insert(0, n)
203 # Add a new cache slot for the locator
204 n = KeepBlockCache.CacheSlot(locator)
205 self._cache.insert(0, n)
208 class KeepClient(object):
210 # Default Keep server connection timeout: 2 seconds
211 # Default Keep server read timeout: 300 seconds
212 # Default Keep proxy connection timeout: 20 seconds
213 # Default Keep proxy read timeout: 300 seconds
214 DEFAULT_TIMEOUT = (2, 300)
215 DEFAULT_PROXY_TIMEOUT = (20, 300)
217 class ThreadLimiter(object):
219 Limit the number of threads running at a given time to
220 {desired successes} minus {successes reported}. When successes
221 reported == desired, wake up the remaining threads and tell
224 Should be used in a "with" block.
226 def __init__(self, todo):
229 self._response = None
230 self._todo_lock = threading.Semaphore(todo)
231 self._done_lock = threading.Lock()
234 self._todo_lock.acquire()
237 def __exit__(self, type, value, traceback):
238 self._todo_lock.release()
240 def shall_i_proceed(self):
242 Return true if the current thread should do stuff. Return
243 false if the current thread should just stop.
245 with self._done_lock:
246 return (self._done < self._todo)
248 def save_response(self, response_body, replicas_stored):
250 Records a response body (a locator, possibly signed) returned by
251 the Keep server. It is not necessary to save more than
252 one response, since we presume that any locator returned
253 in response to a successful request is valid.
255 with self._done_lock:
256 self._done += replicas_stored
257 self._response = response_body
261 Returns the body from the response to a PUT request.
263 with self._done_lock:
264 return self._response
268 Return how many successes were reported.
270 with self._done_lock:
274 class KeepService(object):
275 # Make requests to a single Keep service, and track results.
276 HTTP_ERRORS = (requests.exceptions.RequestException,
277 socket.error, ssl.SSLError)
279 def __init__(self, root, session, **headers):
281 self.last_result = None
282 self.success_flag = None
283 self.session = session
284 self.get_headers = {'Accept': 'application/octet-stream'}
285 self.get_headers.update(headers)
286 self.put_headers = headers
289 return self.success_flag is not False
292 return self.success_flag is not None
294 def last_status(self):
296 return self.last_result.status_code
297 except AttributeError:
300 def get(self, locator, timeout=None):
301 # locator is a KeepLocator object.
302 url = self.root + str(locator)
303 _logger.debug("Request: GET %s", url)
305 with timer.Timer() as t:
306 result = self.session.get(url.encode('utf-8'),
307 headers=self.get_headers,
309 except self.HTTP_ERRORS as e:
310 _logger.debug("Request fail: GET %s => %s: %s",
311 url, type(e), str(e))
314 self.last_result = result
315 self.success_flag = retry.check_http_response_success(result)
316 content = result.content
317 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
318 self.last_status(), len(content), t.msecs,
319 (len(content)/(1024.0*1024))/t.secs)
320 if self.success_flag:
321 resp_md5 = hashlib.md5(content).hexdigest()
322 if resp_md5 == locator.md5sum:
324 _logger.warning("Checksum fail: md5(%s) = %s",
328 def put(self, hash_s, body, timeout=None):
329 url = self.root + hash_s
330 _logger.debug("Request: PUT %s", url)
332 result = self.session.put(url.encode('utf-8'),
334 headers=self.put_headers,
336 except self.HTTP_ERRORS as e:
337 _logger.debug("Request fail: PUT %s => %s: %s",
338 url, type(e), str(e))
341 self.last_result = result
342 self.success_flag = retry.check_http_response_success(result)
343 return self.success_flag
346 class KeepWriterThread(threading.Thread):
348 Write a blob of data to the given Keep server. On success, call
349 save_response() of the given ThreadLimiter to save the returned
352 def __init__(self, keep_service, **kwargs):
353 super(KeepClient.KeepWriterThread, self).__init__()
354 self.service = keep_service
356 self._success = False
362 with self.args['thread_limiter'] as limiter:
363 if not limiter.shall_i_proceed():
364 # My turn arrived, but the job has been done without
367 self.run_with_limiter(limiter)
369 def run_with_limiter(self, limiter):
370 if self.service.finished():
372 _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
373 str(threading.current_thread()),
374 self.args['data_hash'],
375 len(self.args['data']),
376 self.args['service_root'])
377 self._success = bool(self.service.put(
378 self.args['data_hash'],
380 timeout=self.args.get('timeout', None)))
381 status = self.service.last_status()
383 result = self.service.last_result
384 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
385 str(threading.current_thread()),
386 self.args['data_hash'],
387 len(self.args['data']),
388 self.args['service_root'])
389 # Tick the 'done' counter for the number of replica
390 # reported stored by the server, for the case that
391 # we're talking to a proxy or other backend that
392 # stores to multiple copies for us.
394 replicas_stored = int(result.headers['x-keep-replicas-stored'])
395 except (KeyError, ValueError):
397 limiter.save_response(result.text.strip(), replicas_stored)
398 elif status is not None:
399 _logger.debug("Request fail: PUT %s => %s %s",
400 self.args['data_hash'], status,
401 self.service.last_result.text)
404 def __init__(self, api_client=None, proxy=None,
405 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
406 api_token=None, local_store=None, block_cache=None,
408 """Initialize a new KeepClient.
411 * api_client: The API client to use to find Keep services. If not
412 provided, KeepClient will build one from available Arvados
414 * proxy: If specified, this KeepClient will send requests to this
415 Keep proxy. Otherwise, KeepClient will fall back to the setting
416 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
417 ensure KeepClient does not use a proxy, pass in an empty string.
418 * timeout: The timeout (in seconds) for HTTP requests to Keep
419 non-proxy servers. A tuple of two floats is interpreted as
420 (connection_timeout, read_timeout): see
421 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
423 * proxy_timeout: The timeout (in seconds) for HTTP requests to
424 Keep proxies. A tuple of two floats is interpreted as
425 (connection_timeout, read_timeout). Default: (20, 300).
426 * api_token: If you're not using an API client, but only talking
427 directly to a Keep proxy, this parameter specifies an API token
428 to authenticate Keep requests. It is an error to specify both
429 api_client and api_token. If you specify neither, KeepClient
430 will use one available from the Arvados configuration.
431 * local_store: If specified, this KeepClient will bypass Keep
432 services, and save data to the named directory. If unspecified,
433 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
434 environment variable. If you want to ensure KeepClient does not
435 use local storage, pass in an empty string. This is primarily
436 intended to mock a server for testing.
437 * num_retries: The default number of times to retry failed requests.
438 This will be used as the default num_retries value when get() and
439 put() are called. Default 0.
441 self.lock = threading.Lock()
443 proxy = config.get('ARVADOS_KEEP_PROXY')
444 if api_token is None:
445 if api_client is None:
446 api_token = config.get('ARVADOS_API_TOKEN')
448 api_token = api_client.api_token
449 elif api_client is not None:
451 "can't build KeepClient with both API client and token")
452 if local_store is None:
453 local_store = os.environ.get('KEEP_LOCAL_STORE')
455 self.block_cache = block_cache if block_cache else KeepBlockCache()
456 self.timeout = timeout
457 self.proxy_timeout = proxy_timeout
460 self.local_store = local_store
461 self.get = self.local_store_get
462 self.put = self.local_store_put
464 self.num_retries = num_retries
465 self.session = requests.Session()
467 if not proxy.endswith('/'):
469 self.api_token = api_token
470 self._keep_services = [{
472 '_service_root': proxy,
474 self.using_proxy = True
475 self._static_services_list = True
477 # It's important to avoid instantiating an API client
478 # unless we actually need one, for testing's sake.
479 if api_client is None:
480 api_client = arvados.api('v1')
481 self.api_client = api_client
482 self.api_token = api_client.api_token
483 self._keep_services = None
484 self.using_proxy = None
485 self._static_services_list = False
487 def current_timeout(self):
488 """Return the appropriate timeout to use for this client: the proxy
489 timeout setting if the backend service is currently a proxy,
490 the regular timeout setting otherwise.
492 # TODO(twp): the timeout should be a property of a
493 # KeepService, not a KeepClient. See #4488.
494 return self.proxy_timeout if self.using_proxy else self.timeout
496 def build_services_list(self, force_rebuild=False):
497 if (self._static_services_list or
498 (self._keep_services and not force_rebuild)):
502 keep_services = self.api_client.keep_services().accessible()
503 except Exception: # API server predates Keep services.
504 keep_services = self.api_client.keep_disks().list()
506 self._keep_services = keep_services.execute().get('items')
507 if not self._keep_services:
508 raise arvados.errors.NoKeepServersError()
510 self.using_proxy = any(ks.get('service_type') == 'proxy'
511 for ks in self._keep_services)
513 # Precompute the base URI for each service.
514 for r in self._keep_services:
515 r['_service_root'] = "{}://[{}]:{:d}/".format(
516 'https' if r['service_ssl_flag'] else 'http',
519 _logger.debug(str(self._keep_services))
521 def _service_weight(self, data_hash, service_uuid):
522 """Compute the weight of a Keep service endpoint for a data
523 block with a known hash.
525 The weight is md5(h + u) where u is the last 15 characters of
526 the service endpoint's UUID.
528 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
530 def weighted_service_roots(self, data_hash, force_rebuild=False):
531 """Return an array of Keep service endpoints, in the order in
532 which they should be probed when reading or writing data with
535 self.build_services_list(force_rebuild)
537 # Sort the available services by weight (heaviest first) for
538 # this data_hash, and return their service_roots (base URIs)
541 svc['_service_root'] for svc in sorted(
544 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
545 _logger.debug(data_hash + ': ' + str(sorted_roots))
548 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
549 # roots_map is a dictionary, mapping Keep service root strings
550 # to KeepService objects. Poll for Keep services, and add any
551 # new ones to roots_map. Return the current list of local
553 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
554 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
555 for root in local_roots:
556 if root not in roots_map:
557 roots_map[root] = self.KeepService(root, self.session, **headers)
561 def _check_loop_result(result):
562 # KeepClient RetryLoops should save results as a 2-tuple: the
563 # actual result of the request, and the number of servers available
564 # to receive the request this round.
565 # This method returns True if there's a real result, False if
566 # there are no more servers available, otherwise None.
567 if isinstance(result, Exception):
569 result, tried_server_count = result
570 if (result is not None) and (result is not False):
572 elif tried_server_count < 1:
573 _logger.info("No more Keep services to try; giving up")
579 def get(self, loc_s, num_retries=None):
580 """Get data from Keep.
582 This method fetches one or more blocks of data from Keep. It
583 sends a request each Keep service registered with the API
584 server (or the proxy provided when this client was
585 instantiated), then each service named in location hints, in
586 sequence. As soon as one service provides the data, it's
590 * loc_s: A string of one or more comma-separated locators to fetch.
591 This method returns the concatenation of these blocks.
592 * num_retries: The number of times to retry GET requests to
593 *each* Keep server if it returns temporary failures, with
594 exponential backoff. Note that, in each loop, the method may try
595 to fetch data from every available Keep service, along with any
596 that are named in location hints in the locator. The default value
597 is set when the KeepClient is initialized.
600 return ''.join(self.get(x) for x in loc_s.split(','))
601 locator = KeepLocator(loc_s)
602 expect_hash = locator.md5sum
604 slot, first = self.block_cache.reserve_cache(expect_hash)
609 # See #3147 for a discussion of the loop implementation. Highlights:
610 # * Refresh the list of Keep services after each failure, in case
611 # it's being updated.
612 # * Retry until we succeed, we're out of retries, or every available
613 # service has returned permanent failure.
614 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
615 for hint in locator.hints if hint.startswith('K@')]
616 # Map root URLs their KeepService objects.
617 roots_map = {root: self.KeepService(root) for root in hint_roots}
619 loop = retry.RetryLoop(num_retries, self._check_loop_result,
621 for tries_left in loop:
623 local_roots = self.map_new_services(
624 roots_map, expect_hash,
625 force_rebuild=(tries_left < num_retries))
626 except Exception as error:
627 loop.save_result(error)
630 # Query KeepService objects that haven't returned
631 # permanent failure, in our specified shuffle order.
632 services_to_try = [roots_map[root]
633 for root in (local_roots + hint_roots)
634 if roots_map[root].usable()]
635 for keep_service in services_to_try:
636 blob = keep_service.get(locator, timeout=self.current_timeout())
639 loop.save_result((blob, len(services_to_try)))
641 # Always cache the result, then return it if we succeeded.
643 self.block_cache.cap_cache()
647 # No servers fulfilled the request. Count how many responded
648 # "not found;" if the ratio is high enough (currently 75%), report
649 # Not Found; otherwise a generic error.
650 # Q: Including 403 is necessary for the Keep tests to continue
651 # passing, but maybe they should expect KeepReadError instead?
652 not_founds = sum(1 for ks in roots_map.values()
653 if ks.last_status() in set([403, 404, 410]))
654 if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
655 raise arvados.errors.NotFoundError(loc_s)
657 raise arvados.errors.KeepReadError(loc_s)
660 def put(self, data, copies=2, num_retries=None):
661 """Save data in Keep.
663 This method will get a list of Keep services from the API server, and
664 send the data to each one simultaneously in a new thread. Once the
665 uploads are finished, if enough copies are saved, this method returns
666 the most recent HTTP response body. If requests fail to upload
667 enough copies, this method raises KeepWriteError.
670 * data: The string of data to upload.
671 * copies: The number of copies that the user requires be saved.
673 * num_retries: The number of times to retry PUT requests to
674 *each* Keep server if it returns temporary failures, with
675 exponential backoff. The default value is set when the
676 KeepClient is initialized.
678 data_hash = hashlib.md5(data).hexdigest()
684 # Tell the proxy how many copies we want it to store
685 headers['X-Keep-Desired-Replication'] = str(copies)
687 thread_limiter = KeepClient.ThreadLimiter(copies)
688 loop = retry.RetryLoop(num_retries, self._check_loop_result,
690 for tries_left in loop:
692 local_roots = self.map_new_services(
693 roots_map, data_hash,
694 force_rebuild=(tries_left < num_retries), **headers)
695 except Exception as error:
696 loop.save_result(error)
700 for service_root, ks in roots_map.iteritems():
703 t = KeepClient.KeepWriterThread(
707 service_root=service_root,
708 thread_limiter=thread_limiter,
709 timeout=self.current_timeout())
714 loop.save_result((thread_limiter.done() >= copies, len(threads)))
717 return thread_limiter.response()
718 raise arvados.errors.KeepWriteError(
719 "Write fail for %s: wanted %d but wrote %d" %
720 (data_hash, copies, thread_limiter.done()))
722 # Local storage methods need no-op num_retries arguments to keep
723 # integration tests happy. With better isolation they could
724 # probably be removed again.
725 def local_store_put(self, data, num_retries=0):
726 md5 = hashlib.md5(data).hexdigest()
727 locator = '%s+%d' % (md5, len(data))
728 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
730 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
731 os.path.join(self.local_store, md5))
734 def local_store_get(self, loc_s, num_retries=0):
736 locator = KeepLocator(loc_s)
738 raise arvados.errors.NotFoundError(
739 "Invalid data locator: '%s'" % loc_s)
740 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
742 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: