X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dccec0cc48025830ac68cbb4a80f5ae73ace3fa7..e8dd6d95b4ab40e6f95b0faa51752599e27f6baf:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 909ee1fec4..f4c85969c3 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -1,6 +1,4 @@ import gflags -import httplib -import httplib2 import logging import os import pprint @@ -20,15 +18,18 @@ import threading import timer import datetime import ssl - -_logger = logging.getLogger('arvados.keep') -global_client_object = None +import socket +import requests import arvados import arvados.config as config import arvados.errors +import arvados.retry as retry import arvados.util +_logger = logging.getLogger('arvados.keep') +global_client_object = None + class KeepLocator(object): EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$') @@ -141,8 +142,81 @@ class Keep(object): def put(data, **kwargs): return Keep.global_client_object().put(data, **kwargs) +class KeepBlockCache(object): + # Default RAM cache is 256MiB + def __init__(self, cache_max=(256 * 1024 * 1024)): + self.cache_max = cache_max + self._cache = [] + self._cache_lock = threading.Lock() + + class CacheSlot(object): + def __init__(self, locator): + self.locator = locator + self.ready = threading.Event() + self.content = None + + def get(self): + self.ready.wait() + return self.content + + def set(self, value): + self.content = value + self.ready.set() + + def size(self): + if self.content is None: + return 0 + else: + return len(self.content) + + def cap_cache(self): + '''Cap the cache size to self.cache_max''' + self._cache_lock.acquire() + try: + # Select all slots except those where ready.is_set() and content is + # None (that means there was an error reading the block). + self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)] + sm = sum([slot.size() for slot in self._cache]) + while len(self._cache) > 0 and sm > self.cache_max: + for i in xrange(len(self._cache)-1, -1, -1): + if self._cache[i].ready.is_set(): + del self._cache[i] + break + sm = sum([slot.size() for slot in self._cache]) + finally: + self._cache_lock.release() + + def reserve_cache(self, locator): + '''Reserve a cache slot for the specified locator, + or return the existing slot.''' + self._cache_lock.acquire() + try: + # Test if the locator is already in the cache + for i in xrange(0, len(self._cache)): + if self._cache[i].locator == locator: + n = self._cache[i] + if i != 0: + # move it to the front + del self._cache[i] + self._cache.insert(0, n) + return n, False + + # Add a new cache slot for the locator + n = KeepBlockCache.CacheSlot(locator) + self._cache.insert(0, n) + return n, True + finally: + self._cache_lock.release() class KeepClient(object): + + # Default Keep server connection timeout: 2 seconds + # Default Keep server read timeout: 300 seconds + # Default Keep proxy connection timeout: 20 seconds + # Default Keep proxy read timeout: 300 seconds + DEFAULT_TIMEOUT = (2, 300) + DEFAULT_PROXY_TIMEOUT = (20, 300) + class ThreadLimiter(object): """ Limit the number of threads running at a given time to @@ -200,15 +274,86 @@ class KeepClient(object): return self._done + class KeepService(object): + # Make requests to a single Keep service, and track results. + HTTP_ERRORS = (requests.exceptions.RequestException, + socket.error, ssl.SSLError) + + def __init__(self, root, **headers): + self.root = root + self.last_result = None + self.success_flag = None + self.get_headers = {'Accept': 'application/octet-stream'} + self.get_headers.update(headers) + self.put_headers = headers + + def usable(self): + return self.success_flag is not False + + def finished(self): + return self.success_flag is not None + + def last_status(self): + try: + return self.last_result.status_code + except AttributeError: + return None + + def get(self, locator, timeout=None): + # locator is a KeepLocator object. + url = self.root + str(locator) + _logger.debug("Request: GET %s", url) + try: + with timer.Timer() as t: + result = requests.get(url.encode('utf-8'), + headers=self.get_headers, + timeout=timeout) + except self.HTTP_ERRORS as e: + _logger.debug("Request fail: GET %s => %s: %s", + url, type(e), str(e)) + self.last_result = e + else: + self.last_result = result + self.success_flag = retry.check_http_response_success(result) + content = result.content + _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)", + self.last_status(), len(content), t.msecs, + (len(content)/(1024.0*1024))/t.secs) + if self.success_flag: + resp_md5 = hashlib.md5(content).hexdigest() + if resp_md5 == locator.md5sum: + return content + _logger.warning("Checksum fail: md5(%s) = %s", + url, resp_md5) + return None + + def put(self, hash_s, body, timeout=None): + url = self.root + hash_s + _logger.debug("Request: PUT %s", url) + try: + result = requests.put(url.encode('utf-8'), + data=body, + headers=self.put_headers, + timeout=timeout) + except self.HTTP_ERRORS as e: + _logger.debug("Request fail: PUT %s => %s: %s", + url, type(e), str(e)) + self.last_result = e + else: + self.last_result = result + self.success_flag = retry.check_http_response_success(result) + return self.success_flag + + class KeepWriterThread(threading.Thread): """ Write a blob of data to the given Keep server. On success, call save_response() of the given ThreadLimiter to save the returned locator. """ - def __init__(self, api_token, **kwargs): + def __init__(self, keep_service, **kwargs): super(KeepClient.KeepWriterThread, self).__init__() - self._api_token = api_token + self.service = keep_service self.args = kwargs self._success = False @@ -224,55 +369,42 @@ class KeepClient(object): self.run_with_limiter(limiter) def run_with_limiter(self, limiter): + if self.service.finished(): + return _logger.debug("KeepWriterThread %s proceeding %s %s", str(threading.current_thread()), self.args['data_hash'], self.args['service_root']) - h = httplib2.Http(timeout=self.args.get('timeout', None)) - url = self.args['service_root'] + self.args['data_hash'] - headers = {'Authorization': "OAuth2 %s" % (self._api_token,)} - - if self.args['using_proxy']: - # We're using a proxy, so tell the proxy how many copies we - # want it to store - headers['X-Keep-Desired-Replication'] = str(self.args['want_copies']) - - try: - _logger.debug("Uploading to {}".format(url)) - resp, content = h.request(url.encode('utf-8'), 'PUT', - headers=headers, - body=self.args['data']) - if re.match(r'^2\d\d$', resp['status']): - self._success = True - _logger.debug("KeepWriterThread %s succeeded %s %s", - str(threading.current_thread()), - self.args['data_hash'], - self.args['service_root']) + self._success = bool(self.service.put( + self.args['data_hash'], + self.args['data'], + timeout=self.args.get('timeout', None))) + status = self.service.last_status() + if self._success: + result = self.service.last_result + _logger.debug("KeepWriterThread %s succeeded %s %s", + str(threading.current_thread()), + self.args['data_hash'], + self.args['service_root']) + # Tick the 'done' counter for the number of replica + # reported stored by the server, for the case that + # we're talking to a proxy or other backend that + # stores to multiple copies for us. + try: + replicas_stored = int(result.headers['x-keep-replicas-stored']) + except (KeyError, ValueError): replicas_stored = 1 - if 'x-keep-replicas-stored' in resp: - # Tick the 'done' counter for the number of replica - # reported stored by the server, for the case that - # we're talking to a proxy or other backend that - # stores to multiple copies for us. - try: - replicas_stored = int(resp['x-keep-replicas-stored']) - except ValueError: - pass - limiter.save_response(content.strip(), replicas_stored) - else: - _logger.debug("Request fail: PUT %s => %s %s", - url, resp['status'], content) - except (httplib2.HttpLib2Error, - httplib.HTTPException, - ssl.SSLError) as e: - # When using https, timeouts look like ssl.SSLError from here. - # "SSLError: The write operation timed out" - _logger.debug("Request fail: PUT %s => %s: %s", - url, type(e), str(e)) + limiter.save_response(result.text.strip(), replicas_stored) + elif status is not None: + _logger.debug("Request fail: PUT %s => %s %s", + self.args['data_hash'], status, + self.service.last_result.text) - def __init__(self, api_client=None, proxy=None, timeout=60, - api_token=None, local_store=None): + def __init__(self, api_client=None, proxy=None, + timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT, + api_token=None, local_store=None, block_cache=None, + num_retries=0): """Initialize a new KeepClient. Arguments: @@ -283,8 +415,14 @@ class KeepClient(object): Keep proxy. Otherwise, KeepClient will fall back to the setting of the ARVADOS_KEEP_PROXY configuration setting. If you want to ensure KeepClient does not use a proxy, pass in an empty string. - * timeout: The timeout for all HTTP requests, in seconds. Default - 60. + * timeout: The timeout (in seconds) for HTTP requests to Keep + non-proxy servers. A tuple of two floats is interpreted as + (connection_timeout, read_timeout): see + http://docs.python-requests.org/en/latest/user/advanced/#timeouts. + Default: (2, 300). + * proxy_timeout: The timeout (in seconds) for HTTP requests to + Keep proxies. A tuple of two floats is interpreted as + (connection_timeout, read_timeout). Default: (20, 300). * api_token: If you're not using an API client, but only talking directly to a Keep proxy, this parameter specifies an API token to authenticate Keep requests. It is an error to specify both @@ -296,33 +434,44 @@ class KeepClient(object): environment variable. If you want to ensure KeepClient does not use local storage, pass in an empty string. This is primarily intended to mock a server for testing. + * num_retries: The default number of times to retry failed requests. + This will be used as the default num_retries value when get() and + put() are called. Default 0. """ self.lock = threading.Lock() if proxy is None: proxy = config.get('ARVADOS_KEEP_PROXY') if api_token is None: - api_token = config.get('ARVADOS_API_TOKEN') + if api_client is None: + api_token = config.get('ARVADOS_API_TOKEN') + else: + api_token = api_client.api_token elif api_client is not None: raise ValueError( "can't build KeepClient with both API client and token") if local_store is None: local_store = os.environ.get('KEEP_LOCAL_STORE') + self.block_cache = block_cache if block_cache else KeepBlockCache() + self.timeout = timeout + self.proxy_timeout = proxy_timeout + if local_store: self.local_store = local_store self.get = self.local_store_get self.put = self.local_store_put else: - self.timeout = timeout - self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB - self._cache = [] - self._cache_lock = threading.Lock() + self.num_retries = num_retries if proxy: if not proxy.endswith('/'): proxy += '/' self.api_token = api_token - self.service_roots = [proxy] + self._keep_services = [{ + 'uuid': 'proxy', + '_service_root': proxy, + }] self.using_proxy = True + self._static_services_list = True else: # It's important to avoid instantiating an API client # unless we actually need one, for testing's sake. @@ -330,239 +479,249 @@ class KeepClient(object): api_client = arvados.api('v1') self.api_client = api_client self.api_token = api_client.api_token - self.service_roots = None + self._keep_services = None self.using_proxy = None + self._static_services_list = False - def shuffled_service_roots(self, hash): - if self.service_roots is None: - with self.lock: - try: - keep_services = self.api_client.keep_services().accessible() - except Exception: # API server predates Keep services. - keep_services = self.api_client.keep_disks().list() - - keep_services = keep_services.execute().get('items') - if not keep_services: - raise arvados.errors.NoKeepServersError() - - self.using_proxy = (keep_services[0].get('service_type') == - 'proxy') - - roots = (("http%s://%s:%d/" % - ('s' if f['service_ssl_flag'] else '', - f['service_host'], - f['service_port'])) - for f in keep_services) - self.service_roots = sorted(set(roots)) - _logger.debug(str(self.service_roots)) - - # Build an ordering with which to query the Keep servers based on the - # contents of the hash. - # "hash" is a hex-encoded number at least 8 digits - # (32 bits) long - - # seed used to calculate the next keep server from 'pool' - # to be added to 'pseq' - seed = hash - - # Keep servers still to be added to the ordering - pool = self.service_roots[:] - - # output probe sequence - pseq = [] - - # iterate while there are servers left to be assigned - while len(pool) > 0: - if len(seed) < 8: - # ran out of digits in the seed - if len(pseq) < len(hash) / 4: - # the number of servers added to the probe sequence is less - # than the number of 4-digit slices in 'hash' so refill the - # seed with the last 4 digits and then append the contents - # of 'hash'. - seed = hash[-4:] + hash - else: - # refill the seed with the contents of 'hash' - seed += hash - - # Take the next 8 digits (32 bytes) and interpret as an integer, - # then modulus with the size of the remaining pool to get the next - # selected server. - probe = int(seed[0:8], 16) % len(pool) - - # Append the selected server to the probe sequence and remove it - # from the pool. - pseq += [pool[probe]] - pool = pool[:probe] + pool[probe+1:] - - # Remove the digits just used from the seed - seed = seed[8:] - _logger.debug(str(pseq)) - return pseq - - class CacheSlot(object): - def __init__(self, locator): - self.locator = locator - self.ready = threading.Event() - self.content = None - - def get(self): - self.ready.wait() - return self.content - - def set(self, value): - self.content = value - self.ready.set() + def current_timeout(self): + """Return the appropriate timeout to use for this client: the proxy + timeout setting if the backend service is currently a proxy, + the regular timeout setting otherwise. + """ + # TODO(twp): the timeout should be a property of a + # KeepService, not a KeepClient. See #4488. + return self.proxy_timeout if self.using_proxy else self.timeout + + def build_services_list(self, force_rebuild=False): + if (self._static_services_list or + (self._keep_services and not force_rebuild)): + return + with self.lock: + try: + keep_services = self.api_client.keep_services().accessible() + except Exception: # API server predates Keep services. + keep_services = self.api_client.keep_disks().list() + + self._keep_services = keep_services.execute().get('items') + if not self._keep_services: + raise arvados.errors.NoKeepServersError() + + self.using_proxy = any(ks.get('service_type') == 'proxy' + for ks in self._keep_services) + + # Precompute the base URI for each service. + for r in self._keep_services: + r['_service_root'] = "{}://[{}]:{:d}/".format( + 'https' if r['service_ssl_flag'] else 'http', + r['service_host'], + r['service_port']) + _logger.debug(str(self._keep_services)) + + def _service_weight(self, data_hash, service_uuid): + """Compute the weight of a Keep service endpoint for a data + block with a known hash. + + The weight is md5(h + u) where u is the last 15 characters of + the service endpoint's UUID. + """ + return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest() - def size(self): - if self.content == None: - return 0 - else: - return len(self.content) + def weighted_service_roots(self, data_hash, force_rebuild=False): + """Return an array of Keep service endpoints, in the order in + which they should be probed when reading or writing data with + the given hash. + """ + self.build_services_list(force_rebuild) + + # Sort the available services by weight (heaviest first) for + # this data_hash, and return their service_roots (base URIs) + # in that order. + sorted_roots = [ + svc['_service_root'] for svc in sorted( + self._keep_services, + reverse=True, + key=lambda svc: self._service_weight(data_hash, svc['uuid']))] + _logger.debug(data_hash + ': ' + str(sorted_roots)) + return sorted_roots + + def map_new_services(self, roots_map, md5_s, force_rebuild, **headers): + # roots_map is a dictionary, mapping Keep service root strings + # to KeepService objects. Poll for Keep services, and add any + # new ones to roots_map. Return the current list of local + # root strings. + headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) + local_roots = self.weighted_service_roots(md5_s, force_rebuild) + for root in local_roots: + if root not in roots_map: + roots_map[root] = self.KeepService(root, **headers) + return local_roots - def cap_cache(self): - '''Cap the cache size to self.cache_max''' - self._cache_lock.acquire() - try: - self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache) - sm = sum([slot.size() for slot in self._cache]) - while sm > self.cache_max: - del self._cache[-1] - sm = sum([slot.size() for a in self._cache]) - finally: - self._cache_lock.release() + @staticmethod + def _check_loop_result(result): + # KeepClient RetryLoops should save results as a 2-tuple: the + # actual result of the request, and the number of servers available + # to receive the request this round. + # This method returns True if there's a real result, False if + # there are no more servers available, otherwise None. + if isinstance(result, Exception): + return None + result, tried_server_count = result + if (result is not None) and (result is not False): + return True + elif tried_server_count < 1: + _logger.info("No more Keep services to try; giving up") + return False + else: + return None - def reserve_cache(self, locator): - '''Reserve a cache slot for the specified locator, - or return the existing slot.''' - self._cache_lock.acquire() - try: - # Test if the locator is already in the cache - for i in xrange(0, len(self._cache)): - if self._cache[i].locator == locator: - n = self._cache[i] - if i != 0: - # move it to the front - del self._cache[i] - self._cache.insert(0, n) - return n, False + @retry.retry_method + def get(self, loc_s, num_retries=None): + """Get data from Keep. - # Add a new cache slot for the locator - n = KeepClient.CacheSlot(locator) - self._cache.insert(0, n) - return n, True - finally: - self._cache_lock.release() + This method fetches one or more blocks of data from Keep. It + sends a request each Keep service registered with the API + server (or the proxy provided when this client was + instantiated), then each service named in location hints, in + sequence. As soon as one service provides the data, it's + returned. - def get(self, loc_s): + Arguments: + * loc_s: A string of one or more comma-separated locators to fetch. + This method returns the concatenation of these blocks. + * num_retries: The number of times to retry GET requests to + *each* Keep server if it returns temporary failures, with + exponential backoff. Note that, in each loop, the method may try + to fetch data from every available Keep service, along with any + that are named in location hints in the locator. The default value + is set when the KeepClient is initialized. + """ if ',' in loc_s: return ''.join(self.get(x) for x in loc_s.split(',')) locator = KeepLocator(loc_s) expect_hash = locator.md5sum - slot, first = self.reserve_cache(expect_hash) - + slot, first = self.block_cache.reserve_cache(expect_hash) if not first: v = slot.get() return v - try: - for service_root in self.shuffled_service_roots(expect_hash): - url = service_root + loc_s - headers = {'Authorization': "OAuth2 %s" % (self.api_token,), - 'Accept': 'application/octet-stream'} - blob = self.get_url(url, headers, expect_hash) - if blob: - slot.set(blob) - self.cap_cache() - return blob - - for hint in locator.hints: - if not hint.startswith('K@'): - continue - url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s - blob = self.get_url(url, {}, expect_hash) - if blob: - slot.set(blob) - self.cap_cache() - return blob - except: - slot.set(None) - self.cap_cache() - raise - - slot.set(None) - self.cap_cache() - raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash) - - def get_url(self, url, headers, expect_hash): - h = httplib2.Http() - try: - _logger.debug("Request: GET %s", url) - with timer.Timer() as t: - resp, content = h.request(url.encode('utf-8'), 'GET', - headers=headers) - _logger.info("Received %s bytes in %s msec (%s MiB/sec)", - len(content), t.msecs, - (len(content)/(1024*1024))/t.secs) - if re.match(r'^2\d\d$', resp['status']): - md5 = hashlib.md5(content).hexdigest() - if md5 == expect_hash: - return content - _logger.warning("Checksum fail: md5(%s) = %s", url, md5) - except Exception as e: - _logger.debug("Request fail: GET %s => %s: %s", - url, type(e), str(e)) - return None - - def put(self, data, copies=2): + # See #3147 for a discussion of the loop implementation. Highlights: + # * Refresh the list of Keep services after each failure, in case + # it's being updated. + # * Retry until we succeed, we're out of retries, or every available + # service has returned permanent failure. + hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:]) + for hint in locator.hints if hint.startswith('K@')] + # Map root URLs their KeepService objects. + roots_map = {root: self.KeepService(root) for root in hint_roots} + blob = None + loop = retry.RetryLoop(num_retries, self._check_loop_result, + backoff_start=2) + for tries_left in loop: + try: + local_roots = self.map_new_services( + roots_map, expect_hash, + force_rebuild=(tries_left < num_retries)) + except Exception as error: + loop.save_result(error) + continue + + # Query KeepService objects that haven't returned + # permanent failure, in our specified shuffle order. + services_to_try = [roots_map[root] + for root in (local_roots + hint_roots) + if roots_map[root].usable()] + for keep_service in services_to_try: + blob = keep_service.get(locator, timeout=self.current_timeout()) + if blob is not None: + break + loop.save_result((blob, len(services_to_try))) + + # Always cache the result, then return it if we succeeded. + slot.set(blob) + self.block_cache.cap_cache() + if loop.success(): + return blob + + # No servers fulfilled the request. Count how many responded + # "not found;" if the ratio is high enough (currently 75%), report + # Not Found; otherwise a generic error. + # Q: Including 403 is necessary for the Keep tests to continue + # passing, but maybe they should expect KeepReadError instead? + not_founds = sum(1 for ks in roots_map.values() + if ks.last_status() in set([403, 404, 410])) + if roots_map and ((float(not_founds) / len(roots_map)) >= .75): + raise arvados.errors.NotFoundError(loc_s) + else: + raise arvados.errors.KeepReadError(loc_s) + + @retry.retry_method + def put(self, data, copies=2, num_retries=None): + """Save data in Keep. + + This method will get a list of Keep services from the API server, and + send the data to each one simultaneously in a new thread. Once the + uploads are finished, if enough copies are saved, this method returns + the most recent HTTP response body. If requests fail to upload + enough copies, this method raises KeepWriteError. + + Arguments: + * data: The string of data to upload. + * copies: The number of copies that the user requires be saved. + Default 2. + * num_retries: The number of times to retry PUT requests to + *each* Keep server if it returns temporary failures, with + exponential backoff. The default value is set when the + KeepClient is initialized. + """ data_hash = hashlib.md5(data).hexdigest() - have_copies = 0 - want_copies = copies - if not (want_copies > 0): + if copies < 1: return data_hash - threads = [] - thread_limiter = KeepClient.ThreadLimiter(want_copies) - for service_root in self.shuffled_service_roots(data_hash): - t = KeepClient.KeepWriterThread( - self.api_token, - data=data, - data_hash=data_hash, - service_root=service_root, - thread_limiter=thread_limiter, - timeout=self.timeout, - using_proxy=self.using_proxy, - want_copies=(want_copies if self.using_proxy else 1)) - t.start() - threads += [t] - for t in threads: - t.join() - if thread_limiter.done() < want_copies: - # Retry the threads (i.e., services) that failed the first - # time around. - threads_retry = [] + + headers = {} + if self.using_proxy: + # Tell the proxy how many copies we want it to store + headers['X-Keep-Desired-Replication'] = str(copies) + roots_map = {} + thread_limiter = KeepClient.ThreadLimiter(copies) + loop = retry.RetryLoop(num_retries, self._check_loop_result, + backoff_start=2) + for tries_left in loop: + try: + local_roots = self.map_new_services( + roots_map, data_hash, + force_rebuild=(tries_left < num_retries), **headers) + except Exception as error: + loop.save_result(error) + continue + + threads = [] + for service_root, ks in roots_map.iteritems(): + if ks.finished(): + continue + t = KeepClient.KeepWriterThread( + ks, + data=data, + data_hash=data_hash, + service_root=service_root, + thread_limiter=thread_limiter, + timeout=self.current_timeout()) + t.start() + threads.append(t) for t in threads: - if not t.success(): - _logger.debug("Retrying: PUT %s %s", - t.args['service_root'], - t.args['data_hash']) - retry_with_args = t.args.copy() - t_retry = KeepClient.KeepWriterThread(self.api_token, - **retry_with_args) - t_retry.start() - threads_retry += [t_retry] - for t in threads_retry: t.join() - have_copies = thread_limiter.done() - # If we're done, return the response from Keep - if have_copies >= want_copies: + loop.save_result((thread_limiter.done() >= copies, len(threads))) + + if loop.success(): return thread_limiter.response() raise arvados.errors.KeepWriteError( "Write fail for %s: wanted %d but wrote %d" % - (data_hash, want_copies, have_copies)) - + (data_hash, copies, thread_limiter.done())) - def local_store_put(self, data): + # Local storage methods need no-op num_retries arguments to keep + # integration tests happy. With better isolation they could + # probably be removed again. + def local_store_put(self, data, num_retries=0): md5 = hashlib.md5(data).hexdigest() locator = '%s+%d' % (md5, len(data)) with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f: @@ -571,7 +730,7 @@ class KeepClient(object): os.path.join(self.local_store, md5)) return locator - def local_store_get(self, loc_s): + def local_store_get(self, loc_s, num_retries=0): try: locator = KeepLocator(loc_s) except ValueError: