+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content == None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ self._cache_lock.acquire()
+ try:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+ finally:
+ self._cache_lock.release()
+
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ self._cache_lock.acquire()
+ try:
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n, False
+
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+ finally:
+ self._cache_lock.release()