import timer
import datetime
import ssl
+import socket
+_logger = logging.getLogger('arvados.keep')
global_client_object = None
-from api import *
-import config
+import arvados
+import arvados.config as config
import arvados.errors
+import arvados.retry as retry
import arvados.util
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
+ HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
def __init__(self, locator_str):
- self.size = None
- self.loc_hint = None
+ self.hints = []
self._perm_sig = None
self._perm_expiry = None
pieces = iter(locator_str.split('+'))
self.md5sum = next(pieces)
+ try:
+ self.size = int(next(pieces))
+ except StopIteration:
+ self.size = None
for hint in pieces:
- if hint.startswith('A'):
+ if self.HINT_RE.match(hint) is None:
+ raise ValueError("unrecognized hint data {}".format(hint))
+ elif hint.startswith('A'):
self.parse_permission_hint(hint)
- elif hint.startswith('K'):
- self.loc_hint = hint # FIXME
- elif hint.isdigit():
- self.size = int(hint)
else:
- raise ValueError("unrecognized hint data {}".format(hint))
+ self.hints.append(hint)
def __str__(self):
return '+'.join(
- str(s) for s in [self.md5sum, self.size, self.loc_hint,
- self.permission_hint()]
+ str(s) for s in [self.md5sum, self.size,
+ self.permission_hint()] + self.hints
if s is not None)
def _make_hex_prop(name, length):
return self.perm_expiry <= as_of_dt
-class Keep:
- @staticmethod
- def global_client_object():
+class Keep(object):
+ """Simple interface to a global KeepClient object.
+
+ THIS CLASS IS DEPRECATED. Please instantiate your own KeepClient with your
+ own API client. The global KeepClient will build an API client from the
+ current Arvados configuration, which may not match the one you built.
+ """
+ _last_key = None
+
+ @classmethod
+ def global_client_object(cls):
global global_client_object
- if global_client_object == None:
+ # Previously, KeepClient would change its behavior at runtime based
+ # on these configuration settings. We simulate that behavior here
+ # by checking the values and returning a new KeepClient if any of
+ # them have changed.
+ key = (config.get('ARVADOS_API_HOST'),
+ config.get('ARVADOS_API_TOKEN'),
+ config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
+ config.get('ARVADOS_KEEP_PROXY'),
+ config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
+ os.environ.get('KEEP_LOCAL_STORE'))
+ if (global_client_object is None) or (cls._last_key != key):
global_client_object = KeepClient()
+ cls._last_key = key
return global_client_object
@staticmethod
def put(data, **kwargs):
return Keep.global_client_object().put(data, **kwargs)
-class KeepClient(object):
+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content == None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ self._cache_lock.acquire()
+ try:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+ finally:
+ self._cache_lock.release()
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ self._cache_lock.acquire()
+ try:
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n, False
+
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+ finally:
+ self._cache_lock.release()
+
+class KeepClient(object):
class ThreadLimiter(object):
"""
Limit the number of threads running at a given time to
with self._done_lock:
return self._done
+
+ class KeepService(object):
+ # Make requests to a single Keep service, and track results.
+ HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
+ socket.error, ssl.SSLError)
+
+ def __init__(self, root, **headers):
+ self.root = root
+ self.last_result = None
+ self.success_flag = None
+ self.get_headers = {'Accept': 'application/octet-stream'}
+ self.get_headers.update(headers)
+ self.put_headers = headers
+
+ def usable(self):
+ return self.success_flag is not False
+
+ def finished(self):
+ return self.success_flag is not None
+
+ def last_status(self):
+ try:
+ return int(self.last_result[0].status)
+ except (AttributeError, IndexError, ValueError):
+ return None
+
+ def get(self, http, locator):
+ # http is an httplib2.Http object.
+ # locator is a KeepLocator object.
+ url = self.root + str(locator)
+ _logger.debug("Request: GET %s", url)
+ try:
+ with timer.Timer() as t:
+ result = http.request(url.encode('utf-8'), 'GET',
+ headers=self.get_headers)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: GET %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ content = result[1]
+ _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+ self.last_status(), len(content), t.msecs,
+ (len(content)/(1024.0*1024))/t.secs)
+ if self.success_flag:
+ resp_md5 = hashlib.md5(content).hexdigest()
+ if resp_md5 == locator.md5sum:
+ return content
+ _logger.warning("Checksum fail: md5(%s) = %s",
+ url, resp_md5)
+ return None
+
+ def put(self, http, hash_s, body):
+ url = self.root + hash_s
+ _logger.debug("Request: PUT %s", url)
+ try:
+ result = http.request(url.encode('utf-8'), 'PUT',
+ headers=self.put_headers, body=body)
+ except self.HTTP_ERRORS as e:
+ _logger.debug("Request fail: PUT %s => %s: %s",
+ url, type(e), str(e))
+ self.last_result = e
+ else:
+ self.last_result = result
+ self.success_flag = retry.check_http_response_success(result)
+ return self.success_flag
+
+
class KeepWriterThread(threading.Thread):
"""
Write a blob of data to the given Keep server. On success, call
save_response() of the given ThreadLimiter to save the returned
locator.
"""
- def __init__(self, **kwargs):
+ def __init__(self, keep_service, **kwargs):
super(KeepClient.KeepWriterThread, self).__init__()
+ self.service = keep_service
self.args = kwargs
self._success = False
self.run_with_limiter(limiter)
def run_with_limiter(self, limiter):
- logging.debug("KeepWriterThread %s proceeding %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
+ if self.service.finished():
+ return
+ _logger.debug("KeepWriterThread %s proceeding %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
h = httplib2.Http(timeout=self.args.get('timeout', None))
- url = self.args['service_root'] + self.args['data_hash']
- api_token = config.get('ARVADOS_API_TOKEN')
- headers = {'Authorization': "OAuth2 %s" % api_token}
-
- if self.args['using_proxy']:
- # We're using a proxy, so tell the proxy how many copies we
- # want it to store
- headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
- try:
- logging.debug("Uploading to {}".format(url))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=self.args['data'])
- if (resp['status'] == '401' and
- re.match(r'Timestamp verification failed', content)):
- body = KeepClient.sign_for_old_server(
- self.args['data_hash'],
- self.args['data'])
- h = httplib2.Http(timeout=self.args.get('timeout', None))
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=body)
- if re.match(r'^2\d\d$', resp['status']):
- self._success = True
- logging.debug("KeepWriterThread %s succeeded %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
+ self._success = bool(self.service.put(
+ h, self.args['data_hash'], self.args['data']))
+ status = self.service.last_status()
+ if self._success:
+ resp, body = self.service.last_result
+ _logger.debug("KeepWriterThread %s succeeded %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(resp['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
replicas_stored = 1
- if 'x-keep-replicas-stored' in resp:
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
- try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
- except ValueError:
- pass
- limiter.save_response(content.strip(), replicas_stored)
- else:
- logging.warning("Request fail: PUT %s => %s %s" %
- (url, resp['status'], content))
- except (httplib2.HttpLib2Error,
- httplib.HTTPException,
- ssl.SSLError) as e:
- # When using https, timeouts look like ssl.SSLError from here.
- # "SSLError: The write operation timed out"
- logging.warning("Request fail: PUT %s => %s: %s" %
- (url, type(e), str(e)))
-
- def __init__(self, **kwargs):
+ limiter.save_response(body.strip(), replicas_stored)
+ elif status is not None:
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.args['data_hash'], status,
+ self.service.last_result[1])
+
+
+ def __init__(self, api_client=None, proxy=None, timeout=300,
+ api_token=None, local_store=None, block_cache=None,
+ num_retries=0):
+ """Initialize a new KeepClient.
+
+ Arguments:
+ * api_client: The API client to use to find Keep services. If not
+ provided, KeepClient will build one from available Arvados
+ configuration.
+ * proxy: If specified, this KeepClient will send requests to this
+ Keep proxy. Otherwise, KeepClient will fall back to the setting
+ of the ARVADOS_KEEP_PROXY configuration setting. If you want to
+ ensure KeepClient does not use a proxy, pass in an empty string.
+ * timeout: The timeout for all HTTP requests, in seconds. Default
+ 300.
+ * api_token: If you're not using an API client, but only talking
+ directly to a Keep proxy, this parameter specifies an API token
+ to authenticate Keep requests. It is an error to specify both
+ api_client and api_token. If you specify neither, KeepClient
+ will use one available from the Arvados configuration.
+ * local_store: If specified, this KeepClient will bypass Keep
+ services, and save data to the named directory. If unspecified,
+ KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
+ environment variable. If you want to ensure KeepClient does not
+ use local storage, pass in an empty string. This is primarily
+ intended to mock a server for testing.
+ * num_retries: The default number of times to retry failed requests.
+ This will be used as the default num_retries value when get() and
+ put() are called. Default 0.
+ """
self.lock = threading.Lock()
- self.service_roots = None
- self._cache_lock = threading.Lock()
- self._cache = []
- # default 256 megabyte cache
- self.cache_max = 256 * 1024 * 1024
- self.using_proxy = False
- self.timeout = kwargs.get('timeout', 60)
-
- def shuffled_service_roots(self, hash):
- if self.service_roots == None:
- self.lock.acquire()
-
- # Override normal keep disk lookup with an explict proxy
- # configuration.
- keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
- if keep_proxy_env != None and len(keep_proxy_env) > 0:
-
- if keep_proxy_env[-1:] != '/':
- keep_proxy_env += "/"
- self.service_roots = [keep_proxy_env]
+ if proxy is None:
+ proxy = config.get('ARVADOS_KEEP_PROXY')
+ if api_token is None:
+ api_token = config.get('ARVADOS_API_TOKEN')
+ elif api_client is not None:
+ raise ValueError(
+ "can't build KeepClient with both API client and token")
+ if local_store is None:
+ local_store = os.environ.get('KEEP_LOCAL_STORE')
+
+ self.block_cache = block_cache if block_cache else KeepBlockCache()
+
+ if local_store:
+ self.local_store = local_store
+ self.get = self.local_store_get
+ self.put = self.local_store_put
+ else:
+ self.timeout = timeout
+ self.num_retries = num_retries
+ if proxy:
+ if not proxy.endswith('/'):
+ proxy += '/'
+ self.api_token = api_token
+ self.service_roots = [proxy]
self.using_proxy = True
+ self.static_service_roots = True
else:
- try:
- try:
- keep_services = arvados.api().keep_services().accessible().execute()['items']
- except Exception:
- keep_services = arvados.api().keep_disks().list().execute()['items']
-
- if len(keep_services) == 0:
- raise arvados.errors.NoKeepServersError()
-
- if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
- self.using_proxy = True
-
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_services)
- self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
- finally:
- self.lock.release()
+ # It's important to avoid instantiating an API client
+ # unless we actually need one, for testing's sake.
+ if api_client is None:
+ api_client = arvados.api('v1')
+ self.api_client = api_client
+ self.api_token = api_client.api_token
+ self.service_roots = None
+ self.using_proxy = None
+ self.static_service_roots = False
+
+ def build_service_roots(self, force_rebuild=False):
+ if (self.static_service_roots or
+ (self.service_roots and not force_rebuild)):
+ return
+ with self.lock:
+ try:
+ keep_services = self.api_client.keep_services().accessible()
+ except Exception: # API server predates Keep services.
+ keep_services = self.api_client.keep_disks().list()
+
+ keep_services = keep_services.execute().get('items')
+ if not keep_services:
+ raise arvados.errors.NoKeepServersError()
+
+ self.using_proxy = (keep_services[0].get('service_type') ==
+ 'proxy')
+
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_services)
+ self.service_roots = sorted(set(roots))
+ _logger.debug(str(self.service_roots))
+
+ def shuffled_service_roots(self, hash, force_rebuild=False):
+ self.build_service_roots(force_rebuild)
# Build an ordering with which to query the Keep servers based on the
# contents of the hash.
# Remove the digits just used from the seed
seed = seed[8:]
- logging.debug(str(pseq))
+ _logger.debug(str(pseq))
return pseq
- class CacheSlot(object):
- def __init__(self, locator):
- self.locator = locator
- self.ready = threading.Event()
- self.content = None
-
- def get(self):
- self.ready.wait()
- return self.content
-
- def set(self, value):
- self.content = value
- self.ready.set()
-
- def size(self):
- if self.content == None:
- return 0
- else:
- return len(self.content)
-
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
- self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
- sm = sum([slot.size() for slot in self._cache])
- while sm > self.cache_max:
- del self._cache[-1]
- sm = sum([slot.size() for a in self._cache])
- finally:
- self._cache_lock.release()
-
- def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
- or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
- # Add a new cache slot for the locator
- n = KeepClient.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
+ def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
+ # roots_map is a dictionary, mapping Keep service root strings
+ # to KeepService objects. Poll for Keep services, and add any
+ # new ones to roots_map. Return the current list of local
+ # root strings.
+ headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
+ local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+ for root in local_roots:
+ if root not in roots_map:
+ roots_map[root] = self.KeepService(root, **headers)
+ return local_roots
- def get(self, locator):
- #logging.debug("Keep.get %s" % (locator))
-
- if re.search(r',', locator):
- return ''.join(self.get(x) for x in locator.split(','))
- if 'KEEP_LOCAL_STORE' in os.environ:
- return KeepClient.local_store_get(locator)
- expect_hash = re.sub(r'\+.*', '', locator)
+ @staticmethod
+ def _check_loop_result(result):
+ # KeepClient RetryLoops should save results as a 2-tuple: the
+ # actual result of the request, and the number of servers available
+ # to receive the request this round.
+ # This method returns True if there's a real result, False if
+ # there are no more servers available, otherwise None.
+ if isinstance(result, Exception):
+ return None
+ result, tried_server_count = result
+ if (result is not None) and (result is not False):
+ return True
+ elif tried_server_count < 1:
+ _logger.info("No more Keep services to try; giving up")
+ return False
+ else:
+ return None
- slot, first = self.reserve_cache(expect_hash)
- #logging.debug("%s %s %s" % (slot, first, expect_hash))
+ @retry.retry_method
+ def get(self, loc_s, num_retries=None):
+ """Get data from Keep.
+
+ This method fetches one or more blocks of data from Keep. It
+ sends a request each Keep service registered with the API
+ server (or the proxy provided when this client was
+ instantiated), then each service named in location hints, in
+ sequence. As soon as one service provides the data, it's
+ returned.
+
+ Arguments:
+ * loc_s: A string of one or more comma-separated locators to fetch.
+ This method returns the concatenation of these blocks.
+ * num_retries: The number of times to retry GET requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. Note that, in each loop, the method may try
+ to fetch data from every available Keep service, along with any
+ that are named in location hints in the locator. The default value
+ is set when the KeepClient is initialized.
+ """
+ if ',' in loc_s:
+ return ''.join(self.get(x) for x in loc_s.split(','))
+ locator = KeepLocator(loc_s)
+ expect_hash = locator.md5sum
+ slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
return v
- try:
- for service_root in self.shuffled_service_roots(expect_hash):
- url = service_root + locator
- api_token = config.get('ARVADOS_API_TOKEN')
- headers = {'Authorization': "OAuth2 %s" % api_token,
- 'Accept': 'application/octet-stream'}
- blob = self.get_url(url, headers, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
-
- for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
- instance = location_hint.group(1)
- url = 'http://keep.' + instance + '.arvadosapi.com/' + locator
- blob = self.get_url(url, {}, expect_hash)
- if blob:
- slot.set(blob)
- self.cap_cache()
- return blob
- except:
- slot.set(None)
- self.cap_cache()
- raise
-
- slot.set(None)
- self.cap_cache()
- raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
-
- def get_url(self, url, headers, expect_hash):
- h = httplib2.Http()
- try:
- logging.info("Request: GET %s" % (url))
- with timer.Timer() as t:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
- t.msecs,
- (len(content)/(1024*1024))/t.secs))
- if re.match(r'^2\d\d$', resp['status']):
- m = hashlib.new('md5')
- m.update(content)
- md5 = m.hexdigest()
- if md5 == expect_hash:
- return content
- logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
- except Exception as e:
- logging.info("Request fail: GET %s => %s: %s" %
- (url, type(e), str(e)))
- return None
-
- def put(self, data, **kwargs):
- if 'KEEP_LOCAL_STORE' in os.environ:
- return KeepClient.local_store_put(data)
- m = hashlib.new('md5')
- m.update(data)
- data_hash = m.hexdigest()
- have_copies = 0
- want_copies = kwargs.get('copies', 2)
- if not (want_copies > 0):
+ # See #3147 for a discussion of the loop implementation. Highlights:
+ # * Refresh the list of Keep services after each failure, in case
+ # it's being updated.
+ # * Retry until we succeed, we're out of retries, or every available
+ # service has returned permanent failure.
+ hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+ for hint in locator.hints if hint.startswith('K@')]
+ # Map root URLs their KeepService objects.
+ roots_map = {root: self.KeepService(root) for root in hint_roots}
+ blob = None
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ local_roots = self.map_new_services(
+ roots_map, expect_hash,
+ force_rebuild=(tries_left < num_retries))
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ # Query KeepService objects that haven't returned
+ # permanent failure, in our specified shuffle order.
+ services_to_try = [roots_map[root]
+ for root in (local_roots + hint_roots)
+ if roots_map[root].usable()]
+ http = httplib2.Http(timeout=self.timeout)
+ for keep_service in services_to_try:
+ blob = keep_service.get(http, locator)
+ if blob is not None:
+ break
+ loop.save_result((blob, len(services_to_try)))
+
+ # Always cache the result, then return it if we succeeded.
+ slot.set(blob)
+ self.block_cache.cap_cache()
+ if loop.success():
+ return blob
+
+ # No servers fulfilled the request. Count how many responded
+ # "not found;" if the ratio is high enough (currently 75%), report
+ # Not Found; otherwise a generic error.
+ # Q: Including 403 is necessary for the Keep tests to continue
+ # passing, but maybe they should expect KeepReadError instead?
+ not_founds = sum(1 for ks in roots_map.values()
+ if ks.last_status() in set([403, 404, 410]))
+ if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
+ raise arvados.errors.NotFoundError(loc_s)
+ else:
+ raise arvados.errors.KeepReadError(loc_s)
+
+ @retry.retry_method
+ def put(self, data, copies=2, num_retries=None):
+ """Save data in Keep.
+
+ This method will get a list of Keep services from the API server, and
+ send the data to each one simultaneously in a new thread. Once the
+ uploads are finished, if enough copies are saved, this method returns
+ the most recent HTTP response body. If requests fail to upload
+ enough copies, this method raises KeepWriteError.
+
+ Arguments:
+ * data: The string of data to upload.
+ * copies: The number of copies that the user requires be saved.
+ Default 2.
+ * num_retries: The number of times to retry PUT requests to
+ *each* Keep server if it returns temporary failures, with
+ exponential backoff. The default value is set when the
+ KeepClient is initialized.
+ """
+ data_hash = hashlib.md5(data).hexdigest()
+ if copies < 1:
return data_hash
- threads = []
- thread_limiter = KeepClient.ThreadLimiter(want_copies)
- for service_root in self.shuffled_service_roots(data_hash):
- t = KeepClient.KeepWriterThread(
- data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- timeout=self.timeout,
- using_proxy=self.using_proxy,
- want_copies=(want_copies if self.using_proxy else 1))
- t.start()
- threads += [t]
- for t in threads:
- t.join()
- if thread_limiter.done() < want_copies:
- # Retry the threads (i.e., services) that failed the first
- # time around.
- threads_retry = []
+
+ headers = {}
+ if self.using_proxy:
+ # Tell the proxy how many copies we want it to store
+ headers['X-Keep-Desired-Replication'] = str(copies)
+ roots_map = {}
+ thread_limiter = KeepClient.ThreadLimiter(copies)
+ loop = retry.RetryLoop(num_retries, self._check_loop_result,
+ backoff_start=2)
+ for tries_left in loop:
+ try:
+ local_roots = self.map_new_services(
+ roots_map, data_hash,
+ force_rebuild=(tries_left < num_retries), **headers)
+ except Exception as error:
+ loop.save_result(error)
+ continue
+
+ threads = []
+ for service_root, ks in roots_map.iteritems():
+ if ks.finished():
+ continue
+ t = KeepClient.KeepWriterThread(
+ ks,
+ data=data,
+ data_hash=data_hash,
+ service_root=service_root,
+ thread_limiter=thread_limiter,
+ timeout=self.timeout)
+ t.start()
+ threads.append(t)
for t in threads:
- if not t.success():
- logging.warning("Retrying: PUT %s %s" % (
- t.args['service_root'],
- t.args['data_hash']))
- retry_with_args = t.args.copy()
- t_retry = KeepClient.KeepWriterThread(**retry_with_args)
- t_retry.start()
- threads_retry += [t_retry]
- for t in threads_retry:
t.join()
- have_copies = thread_limiter.done()
- # If we're done, return the response from Keep
- if have_copies >= want_copies:
+ loop.save_result((thread_limiter.done() >= copies, len(threads)))
+
+ if loop.success():
return thread_limiter.response()
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
- (data_hash, want_copies, have_copies))
-
- @staticmethod
- def sign_for_old_server(data_hash, data):
- return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
+ (data_hash, copies, thread_limiter.done()))
-
- @staticmethod
- def local_store_put(data):
- m = hashlib.new('md5')
- m.update(data)
- md5 = m.hexdigest()
+ # Local storage methods need no-op num_retries arguments to keep
+ # integration tests happy. With better isolation they could
+ # probably be removed again.
+ def local_store_put(self, data, num_retries=0):
+ md5 = hashlib.md5(data).hexdigest()
locator = '%s+%d' % (md5, len(data))
- with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
+ with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
f.write(data)
- os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
- os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
+ os.rename(os.path.join(self.local_store, md5 + '.tmp'),
+ os.path.join(self.local_store, md5))
return locator
- @staticmethod
- def local_store_get(locator):
- r = re.search('^([0-9a-f]{32,})', locator)
- if not r:
+ def local_store_get(self, loc_s, num_retries=0):
+ try:
+ locator = KeepLocator(loc_s)
+ except ValueError:
raise arvados.errors.NotFoundError(
- "Invalid data locator: '%s'" % locator)
- if r.group(0) == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ "Invalid data locator: '%s'" % loc_s)
+ if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
return ''
- with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
+ with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
return f.read()