import timer
import datetime
import ssl
+import socket
_logger = logging.getLogger('arvados.keep')
global_client_object = None
def put(data, **kwargs):
return Keep.global_client_object().put(data, **kwargs)
+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content == None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ self._cache_lock.acquire()
+ try:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+ finally:
+ self._cache_lock.release()
+
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ self._cache_lock.acquire()
+ try:
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n, False
+
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+ finally:
+ self._cache_lock.release()
class KeepClient(object):
class ThreadLimiter(object):
class KeepService(object):
# Make requests to a single Keep service, and track results.
HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
- ssl.SSLError)
+ socket.error, ssl.SSLError)
def __init__(self, root, **headers):
self.root = root
self.service.last_result[1])
- def __init__(self, api_client=None, proxy=None, timeout=60,
- api_token=None, local_store=None):
+ def __init__(self, api_client=None, proxy=None, timeout=300,
+ api_token=None, local_store=None, block_cache=None):
"""Initialize a new KeepClient.
Arguments:
of the ARVADOS_KEEP_PROXY configuration setting. If you want to
ensure KeepClient does not use a proxy, pass in an empty string.
* timeout: The timeout for all HTTP requests, in seconds. Default
- 60.
+ 300.
* api_token: If you're not using an API client, but only talking
directly to a Keep proxy, this parameter specifies an API token
to authenticate Keep requests. It is an error to specify both
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ self.block_cache = block_cache if block_cache else KeepBlockCache()
+
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
self.timeout = timeout
- self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
- self._cache = []
- self._cache_lock = threading.Lock()
+
if proxy:
if not proxy.endswith('/'):
proxy += '/'
_logger.debug(str(pseq))
return pseq
- class CacheSlot(object):
- def __init__(self, locator):
- self.locator = locator
- self.ready = threading.Event()
- self.content = None
-
- def get(self):
- self.ready.wait()
- return self.content
-
- def set(self, value):
- self.content = value
- self.ready.set()
-
- def size(self):
- if self.content == None:
- return 0
- else:
- return len(self.content)
-
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
- self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
- sm = sum([slot.size() for slot in self._cache])
- while sm > self.cache_max:
- del self._cache[-1]
- sm = sum([slot.size() for a in self._cache])
- finally:
- self._cache_lock.release()
-
- def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
- or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
-
- # Add a new cache slot for the locator
- n = KeepClient.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
- slot, first = self.reserve_cache(expect_hash)
+ slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
return v
# Always cache the result, then return it if we succeeded.
slot.set(blob)
- self.cap_cache()
+ self.block_cache.cap_cache()
if loop.success():
return blob