25 import arvados.config as config
27 import arvados.retry as retry
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
33 class KeepLocator(object):
34 EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35 HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37 def __init__(self, locator_str):
40 self._perm_expiry = None
41 pieces = iter(locator_str.split('+'))
42 self.md5sum = next(pieces)
44 self.size = int(next(pieces))
48 if self.HINT_RE.match(hint) is None:
49 raise ValueError("unrecognized hint data {}".format(hint))
50 elif hint.startswith('A'):
51 self.parse_permission_hint(hint)
53 self.hints.append(hint)
57 str(s) for s in [self.md5sum, self.size,
58 self.permission_hint()] + self.hints
61 def _make_hex_prop(name, length):
62 # Build and return a new property with the given name that
63 # must be a hex string of the given length.
64 data_name = '_{}'.format(name)
66 return getattr(self, data_name)
67 def setter(self, hex_str):
68 if not arvados.util.is_hex(hex_str, length):
69 raise ValueError("{} must be a {}-digit hex string: {}".
70 format(name, length, hex_str))
71 setattr(self, data_name, hex_str)
72 return property(getter, setter)
74 md5sum = _make_hex_prop('md5sum', 32)
75 perm_sig = _make_hex_prop('perm_sig', 40)
78 def perm_expiry(self):
79 return self._perm_expiry
82 def perm_expiry(self, value):
83 if not arvados.util.is_hex(value, 1, 8):
85 "permission timestamp must be a hex Unix timestamp: {}".
87 self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89 def permission_hint(self):
90 data = [self.perm_sig, self.perm_expiry]
93 data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
94 return "A{}@{:08x}".format(*data)
96 def parse_permission_hint(self, s):
98 self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100 raise ValueError("bad permission hint {}".format(s))
102 def permission_expired(self, as_of_dt=None):
103 if self.perm_expiry is None:
105 elif as_of_dt is None:
106 as_of_dt = datetime.datetime.now()
107 return self.perm_expiry <= as_of_dt
111 """Simple interface to a global KeepClient object.
113 THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
114 own API client. The global KeepClient will build an API client from the
115 current Arvados configuration, which may not match the one you built.
120 def global_client_object(cls):
121 global global_client_object
122 # Previously, KeepClient would change its behavior at runtime based
123 # on these configuration settings. We simulate that behavior here
124 # by checking the values and returning a new KeepClient if any of
126 key = (config.get('ARVADOS_API_HOST'),
127 config.get('ARVADOS_API_TOKEN'),
128 config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
129 config.get('ARVADOS_KEEP_PROXY'),
130 config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
131 os.environ.get('KEEP_LOCAL_STORE'))
132 if (global_client_object is None) or (cls._last_key != key):
133 global_client_object = KeepClient()
135 return global_client_object
138 def get(locator, **kwargs):
139 return Keep.global_client_object().get(locator, **kwargs)
142 def put(data, **kwargs):
143 return Keep.global_client_object().put(data, **kwargs)
145 class KeepBlockCache(object):
146 # Default RAM cache is 256MiB
147 def __init__(self, cache_max=(256 * 1024 * 1024)):
148 self.cache_max = cache_max
150 self._cache_lock = threading.Lock()
152 class CacheSlot(object):
153 def __init__(self, locator):
154 self.locator = locator
155 self.ready = threading.Event()
162 def set(self, value):
167 if self.content is None:
170 return len(self.content)
173 '''Cap the cache size to self.cache_max'''
174 self._cache_lock.acquire()
176 # Select all slots except those where ready.is_set() and content is
177 # None (that means there was an error reading the block).
178 self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
179 sm = sum([slot.size() for slot in self._cache])
180 while len(self._cache) > 0 and sm > self.cache_max:
181 for i in xrange(len(self._cache)-1, -1, -1):
182 if self._cache[i].ready.is_set():
185 sm = sum([slot.size() for slot in self._cache])
187 self._cache_lock.release()
189 def reserve_cache(self, locator):
190 '''Reserve a cache slot for the specified locator,
191 or return the existing slot.'''
192 self._cache_lock.acquire()
194 # Test if the locator is already in the cache
195 for i in xrange(0, len(self._cache)):
196 if self._cache[i].locator == locator:
199 # move it to the front
201 self._cache.insert(0, n)
204 # Add a new cache slot for the locator
205 n = KeepBlockCache.CacheSlot(locator)
206 self._cache.insert(0, n)
209 self._cache_lock.release()
211 class KeepClient(object):
213 # Default Keep server connection timeout: 2 seconds
214 # Default Keep server read timeout: 300 seconds
215 # Default Keep proxy connection timeout: 20 seconds
216 # Default Keep proxy read timeout: 300 seconds
217 DEFAULT_TIMEOUT = (2, 300)
218 DEFAULT_PROXY_TIMEOUT = (20, 300)
220 class ThreadLimiter(object):
222 Limit the number of threads running at a given time to
223 {desired successes} minus {successes reported}. When successes
224 reported == desired, wake up the remaining threads and tell
227 Should be used in a "with" block.
229 def __init__(self, todo):
232 self._response = None
233 self._todo_lock = threading.Semaphore(todo)
234 self._done_lock = threading.Lock()
237 self._todo_lock.acquire()
240 def __exit__(self, type, value, traceback):
241 self._todo_lock.release()
243 def shall_i_proceed(self):
245 Return true if the current thread should do stuff. Return
246 false if the current thread should just stop.
248 with self._done_lock:
249 return (self._done < self._todo)
251 def save_response(self, response_body, replicas_stored):
253 Records a response body (a locator, possibly signed) returned by
254 the Keep server. It is not necessary to save more than
255 one response, since we presume that any locator returned
256 in response to a successful request is valid.
258 with self._done_lock:
259 self._done += replicas_stored
260 self._response = response_body
264 Returns the body from the response to a PUT request.
266 with self._done_lock:
267 return self._response
271 Return how many successes were reported.
273 with self._done_lock:
277 class KeepService(object):
278 # Make requests to a single Keep service, and track results.
279 HTTP_ERRORS = (requests.exceptions.RequestException,
280 socket.error, ssl.SSLError)
282 def __init__(self, root, **headers):
284 self.last_result = None
285 self.success_flag = None
286 self.get_headers = {'Accept': 'application/octet-stream'}
287 self.get_headers.update(headers)
288 self.put_headers = headers
291 return self.success_flag is not False
294 return self.success_flag is not None
296 def last_status(self):
298 return self.last_result.status_code
299 except AttributeError:
302 def get(self, locator, timeout=None):
303 # locator is a KeepLocator object.
304 url = self.root + str(locator)
305 _logger.debug("Request: GET %s", url)
307 with timer.Timer() as t:
308 result = requests.get(url.encode('utf-8'),
309 headers=self.get_headers,
311 except self.HTTP_ERRORS as e:
312 _logger.debug("Request fail: GET %s => %s: %s",
313 url, type(e), str(e))
316 self.last_result = result
317 self.success_flag = retry.check_http_response_success(result)
318 content = result.content
319 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
320 self.last_status(), len(content), t.msecs,
321 (len(content)/(1024.0*1024))/t.secs)
322 if self.success_flag:
323 resp_md5 = hashlib.md5(content).hexdigest()
324 if resp_md5 == locator.md5sum:
326 _logger.warning("Checksum fail: md5(%s) = %s",
330 def put(self, hash_s, body, timeout=None):
331 url = self.root + hash_s
332 _logger.debug("Request: PUT %s", url)
334 result = requests.put(url.encode('utf-8'),
336 headers=self.put_headers,
338 except self.HTTP_ERRORS as e:
339 _logger.debug("Request fail: PUT %s => %s: %s",
340 url, type(e), str(e))
343 self.last_result = result
344 self.success_flag = retry.check_http_response_success(result)
345 return self.success_flag
348 class KeepWriterThread(threading.Thread):
350 Write a blob of data to the given Keep server. On success, call
351 save_response() of the given ThreadLimiter to save the returned
354 def __init__(self, keep_service, **kwargs):
355 super(KeepClient.KeepWriterThread, self).__init__()
356 self.service = keep_service
358 self._success = False
364 with self.args['thread_limiter'] as limiter:
365 if not limiter.shall_i_proceed():
366 # My turn arrived, but the job has been done without
369 self.run_with_limiter(limiter)
371 def run_with_limiter(self, limiter):
372 if self.service.finished():
374 _logger.debug("KeepWriterThread %s proceeding %s %s",
375 str(threading.current_thread()),
376 self.args['data_hash'],
377 self.args['service_root'])
378 self._success = bool(self.service.put(
379 self.args['data_hash'],
381 timeout=self.args.get('timeout', None)))
382 status = self.service.last_status()
384 result = self.service.last_result
385 _logger.debug("KeepWriterThread %s succeeded %s %s",
386 str(threading.current_thread()),
387 self.args['data_hash'],
388 self.args['service_root'])
389 # Tick the 'done' counter for the number of replica
390 # reported stored by the server, for the case that
391 # we're talking to a proxy or other backend that
392 # stores to multiple copies for us.
394 replicas_stored = int(result.headers['x-keep-replicas-stored'])
395 except (KeyError, ValueError):
397 limiter.save_response(result.text.strip(), replicas_stored)
398 elif status is not None:
399 _logger.debug("Request fail: PUT %s => %s %s",
400 self.args['data_hash'], status,
401 self.service.last_result.text)
404 def __init__(self, api_client=None, proxy=None,
405 timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
406 api_token=None, local_store=None, block_cache=None,
408 """Initialize a new KeepClient.
411 * api_client: The API client to use to find Keep services. If not
412 provided, KeepClient will build one from available Arvados
414 * proxy: If specified, this KeepClient will send requests to this
415 Keep proxy. Otherwise, KeepClient will fall back to the setting
416 of the ARVADOS_KEEP_PROXY configuration setting. If you want to
417 ensure KeepClient does not use a proxy, pass in an empty string.
418 * timeout: The timeout (in seconds) for HTTP requests to Keep
419 non-proxy servers. A tuple of two floats is interpreted as
420 (connection_timeout, read_timeout): see
421 http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
423 * proxy_timeout: The timeout (in seconds) for HTTP requests to
424 Keep proxies. A tuple of two floats is interpreted as
425 (connection_timeout, read_timeout). Default: (20, 300).
426 * api_token: If you're not using an API client, but only talking
427 directly to a Keep proxy, this parameter specifies an API token
428 to authenticate Keep requests. It is an error to specify both
429 api_client and api_token. If you specify neither, KeepClient
430 will use one available from the Arvados configuration.
431 * local_store: If specified, this KeepClient will bypass Keep
432 services, and save data to the named directory. If unspecified,
433 KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
434 environment variable. If you want to ensure KeepClient does not
435 use local storage, pass in an empty string. This is primarily
436 intended to mock a server for testing.
437 * num_retries: The default number of times to retry failed requests.
438 This will be used as the default num_retries value when get() and
439 put() are called. Default 0.
441 self.lock = threading.Lock()
443 proxy = config.get('ARVADOS_KEEP_PROXY')
444 if api_token is None:
445 if api_client is None:
446 api_token = config.get('ARVADOS_API_TOKEN')
448 api_token = api_client.api_token
449 elif api_client is not None:
451 "can't build KeepClient with both API client and token")
452 if local_store is None:
453 local_store = os.environ.get('KEEP_LOCAL_STORE')
455 self.block_cache = block_cache if block_cache else KeepBlockCache()
456 self.timeout = timeout
457 self.proxy_timeout = proxy_timeout
460 self.local_store = local_store
461 self.get = self.local_store_get
462 self.put = self.local_store_put
464 self.num_retries = num_retries
466 if not proxy.endswith('/'):
468 self.api_token = api_token
469 self._keep_services = [{
471 '_service_root': proxy,
473 self.using_proxy = True
474 self._static_services_list = True
476 # It's important to avoid instantiating an API client
477 # unless we actually need one, for testing's sake.
478 if api_client is None:
479 api_client = arvados.api('v1')
480 self.api_client = api_client
481 self.api_token = api_client.api_token
482 self._keep_services = None
483 self.using_proxy = None
484 self._static_services_list = False
486 def current_timeout(self):
487 """Return the appropriate timeout to use for this client: the proxy
488 timeout setting if the backend service is currently a proxy,
489 the regular timeout setting otherwise.
491 # TODO(twp): the timeout should be a property of a
492 # KeepService, not a KeepClient. See #4488.
493 return self.proxy_timeout if self.using_proxy else self.timeout
495 def build_services_list(self, force_rebuild=False):
496 if (self._static_services_list or
497 (self._keep_services and not force_rebuild)):
501 keep_services = self.api_client.keep_services().accessible()
502 except Exception: # API server predates Keep services.
503 keep_services = self.api_client.keep_disks().list()
505 self._keep_services = keep_services.execute().get('items')
506 if not self._keep_services:
507 raise arvados.errors.NoKeepServersError()
509 self.using_proxy = any(ks.get('service_type') == 'proxy'
510 for ks in self._keep_services)
512 # Precompute the base URI for each service.
513 for r in self._keep_services:
514 r['_service_root'] = "{}://[{}]:{:d}/".format(
515 'https' if r['service_ssl_flag'] else 'http',
518 _logger.debug(str(self._keep_services))
520 def _service_weight(self, data_hash, service_uuid):
521 """Compute the weight of a Keep service endpoint for a data
522 block with a known hash.
524 The weight is md5(h + u) where u is the last 15 characters of
525 the service endpoint's UUID.
527 return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
529 def weighted_service_roots(self, data_hash, force_rebuild=False):
530 """Return an array of Keep service endpoints, in the order in
531 which they should be probed when reading or writing data with
534 self.build_services_list(force_rebuild)
536 # Sort the available services by weight (heaviest first) for
537 # this data_hash, and return their service_roots (base URIs)
540 svc['_service_root'] for svc in sorted(
543 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
544 _logger.debug(data_hash + ': ' + str(sorted_roots))
547 def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
548 # roots_map is a dictionary, mapping Keep service root strings
549 # to KeepService objects. Poll for Keep services, and add any
550 # new ones to roots_map. Return the current list of local
552 headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
553 local_roots = self.weighted_service_roots(md5_s, force_rebuild)
554 for root in local_roots:
555 if root not in roots_map:
556 roots_map[root] = self.KeepService(root, **headers)
560 def _check_loop_result(result):
561 # KeepClient RetryLoops should save results as a 2-tuple: the
562 # actual result of the request, and the number of servers available
563 # to receive the request this round.
564 # This method returns True if there's a real result, False if
565 # there are no more servers available, otherwise None.
566 if isinstance(result, Exception):
568 result, tried_server_count = result
569 if (result is not None) and (result is not False):
571 elif tried_server_count < 1:
572 _logger.info("No more Keep services to try; giving up")
578 def get(self, loc_s, num_retries=None):
579 """Get data from Keep.
581 This method fetches one or more blocks of data from Keep. It
582 sends a request each Keep service registered with the API
583 server (or the proxy provided when this client was
584 instantiated), then each service named in location hints, in
585 sequence. As soon as one service provides the data, it's
589 * loc_s: A string of one or more comma-separated locators to fetch.
590 This method returns the concatenation of these blocks.
591 * num_retries: The number of times to retry GET requests to
592 *each* Keep server if it returns temporary failures, with
593 exponential backoff. Note that, in each loop, the method may try
594 to fetch data from every available Keep service, along with any
595 that are named in location hints in the locator. The default value
596 is set when the KeepClient is initialized.
599 return ''.join(self.get(x) for x in loc_s.split(','))
600 locator = KeepLocator(loc_s)
601 expect_hash = locator.md5sum
603 slot, first = self.block_cache.reserve_cache(expect_hash)
608 # See #3147 for a discussion of the loop implementation. Highlights:
609 # * Refresh the list of Keep services after each failure, in case
610 # it's being updated.
611 # * Retry until we succeed, we're out of retries, or every available
612 # service has returned permanent failure.
613 hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
614 for hint in locator.hints if hint.startswith('K@')]
615 # Map root URLs their KeepService objects.
616 roots_map = {root: self.KeepService(root) for root in hint_roots}
618 loop = retry.RetryLoop(num_retries, self._check_loop_result,
620 for tries_left in loop:
622 local_roots = self.map_new_services(
623 roots_map, expect_hash,
624 force_rebuild=(tries_left < num_retries))
625 except Exception as error:
626 loop.save_result(error)
629 # Query KeepService objects that haven't returned
630 # permanent failure, in our specified shuffle order.
631 services_to_try = [roots_map[root]
632 for root in (local_roots + hint_roots)
633 if roots_map[root].usable()]
634 for keep_service in services_to_try:
635 blob = keep_service.get(locator, timeout=self.current_timeout())
638 loop.save_result((blob, len(services_to_try)))
640 # Always cache the result, then return it if we succeeded.
642 self.block_cache.cap_cache()
647 all_roots = local_roots + hint_roots
649 # We never successfully fetched local_roots.
650 all_roots = hint_roots
651 # Q: Including 403 is necessary for the Keep tests to continue
652 # passing, but maybe they should expect KeepReadError instead?
653 not_founds = sum(1 for key in all_roots
654 if roots_map[key].last_status() in {403, 404, 410})
655 service_errors = ((key, roots_map[key].last_result)
656 for key in all_roots)
658 raise arvados.errors.KeepReadError(
659 "failed to read {}: no Keep services available ({})".format(
660 loc_s, loop.last_result()))
661 elif not_founds == len(all_roots):
662 raise arvados.errors.NotFoundError(
663 "{} not found".format(loc_s), service_errors)
665 raise arvados.errors.KeepReadError(
666 "failed to read {}".format(loc_s), service_errors)
669 def put(self, data, copies=2, num_retries=None):
670 """Save data in Keep.
672 This method will get a list of Keep services from the API server, and
673 send the data to each one simultaneously in a new thread. Once the
674 uploads are finished, if enough copies are saved, this method returns
675 the most recent HTTP response body. If requests fail to upload
676 enough copies, this method raises KeepWriteError.
679 * data: The string of data to upload.
680 * copies: The number of copies that the user requires be saved.
682 * num_retries: The number of times to retry PUT requests to
683 *each* Keep server if it returns temporary failures, with
684 exponential backoff. The default value is set when the
685 KeepClient is initialized.
687 data_hash = hashlib.md5(data).hexdigest()
693 # Tell the proxy how many copies we want it to store
694 headers['X-Keep-Desired-Replication'] = str(copies)
696 thread_limiter = KeepClient.ThreadLimiter(copies)
697 loop = retry.RetryLoop(num_retries, self._check_loop_result,
699 for tries_left in loop:
701 local_roots = self.map_new_services(
702 roots_map, data_hash,
703 force_rebuild=(tries_left < num_retries), **headers)
704 except Exception as error:
705 loop.save_result(error)
709 for service_root, ks in roots_map.iteritems():
712 t = KeepClient.KeepWriterThread(
716 service_root=service_root,
717 thread_limiter=thread_limiter,
718 timeout=self.current_timeout())
723 loop.save_result((thread_limiter.done() >= copies, len(threads)))
726 return thread_limiter.response()
728 raise arvados.errors.KeepWriteError(
729 "failed to write {}: no Keep services available ({})".format(
730 data_hash, loop.last_result()))
732 service_errors = ((key, roots_map[key].last_result)
733 for key in local_roots
734 if not roots_map[key].success_flag)
735 raise arvados.errors.KeepWriteError(
736 "failed to write {} (wanted {} copies but wrote {})".format(
737 data_hash, copies, thread_limiter.done()), service_errors)
739 def local_store_put(self, data, copies=1, num_retries=None):
742 This method is used in place of the real put() method when
743 using local storage (see constructor's local_store argument).
745 copies and num_retries arguments are ignored: they are here
746 only for the sake of offering the same call signature as
749 Data stored this way can be retrieved via local_store_get().
751 md5 = hashlib.md5(data).hexdigest()
752 locator = '%s+%d' % (md5, len(data))
753 with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
755 os.rename(os.path.join(self.local_store, md5 + '.tmp'),
756 os.path.join(self.local_store, md5))
759 def local_store_get(self, loc_s, num_retries=None):
760 """Companion to local_store_put()."""
762 locator = KeepLocator(loc_s)
764 raise arvados.errors.NotFoundError(
765 "Invalid data locator: '%s'" % loc_s)
766 if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
768 with open(os.path.join(self.local_store, locator.md5sum), 'r') as f: