From 46f5403fdd2b3b0bd26d81d81ba509c624f54bb3 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 16 Sep 2014 14:49:38 -0400 Subject: [PATCH] 3878: Fix block cache sharing across threads so arv-mount uses a bounded amount of memory to store block. Also fixes bug in cache management code and cleans some exception handler error reporting. --- sdk/python/arvados/collection.py | 10 +- sdk/python/arvados/keep.py | 133 ++++++++++++++----------- services/fuse/arvados_fuse/__init__.py | 18 +++- 3 files changed, 94 insertions(+), 67 deletions(-) diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index f5c4066a4d..496136ebe3 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -92,9 +92,9 @@ def normalize(collection): class CollectionReader(object): - def __init__(self, manifest_locator_or_text, api_client=None): + def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None): self._api_client = api_client - self._keep_client = None + self._keep_client = keep_client if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text self._manifest_text = None @@ -153,7 +153,7 @@ class CollectionReader(object): # now regenerate the manifest text based on the normalized stream #print "normalizing", self._manifest_text - self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams]) + self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams]) #print "result", self._manifest_text @@ -161,7 +161,7 @@ class CollectionReader(object): self._populate() resp = [] for s in self._streams: - resp.append(StreamReader(s)) + resp.append(StreamReader(s, keep=self._keep_client)) return resp def all_files(self): @@ -172,7 +172,7 @@ class CollectionReader(object): def manifest_text(self, strip=False): self._populate() if strip: - m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams]) + m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams]) return m else: return self._manifest_text diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index f0a872417a..7621a9608e 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -143,6 +143,71 @@ class Keep(object): def put(data, **kwargs): return Keep.global_client_object().put(data, **kwargs) +class KeepBlockCache(object): + # Default RAM cache is 256MiB + def __init__(self, cache_max=(256 * 1024 * 1024)): + self.cache_max = cache_max + self._cache = [] + self._cache_lock = threading.Lock() + + class CacheSlot(object): + def __init__(self, locator): + self.locator = locator + self.ready = threading.Event() + self.content = None + + def get(self): + self.ready.wait() + return self.content + + def set(self, value): + self.content = value + self.ready.set() + + def size(self): + if self.content == None: + return 0 + else: + return len(self.content) + + def cap_cache(self): + '''Cap the cache size to self.cache_max''' + self._cache_lock.acquire() + try: + # Select all slots except those where ready.is_set() and content is + # None (that means there was an error reading the block). + self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)] + sm = sum([slot.size() for slot in self._cache]) + while len(self._cache) > 0 and sm > self.cache_max: + for i in xrange(len(self._cache)-1, -1, -1): + if self._cache[i].ready.is_set(): + del self._cache[i] + break + sm = sum([slot.size() for slot in self._cache]) + finally: + self._cache_lock.release() + + def reserve_cache(self, locator): + '''Reserve a cache slot for the specified locator, + or return the existing slot.''' + self._cache_lock.acquire() + try: + # Test if the locator is already in the cache + for i in xrange(0, len(self._cache)): + if self._cache[i].locator == locator: + n = self._cache[i] + if i != 0: + # move it to the front + del self._cache[i] + self._cache.insert(0, n) + return n, False + + # Add a new cache slot for the locator + n = KeepBlockCache.CacheSlot(locator) + self._cache.insert(0, n) + return n, True + finally: + self._cache_lock.release() class KeepClient(object): class ThreadLimiter(object): @@ -326,7 +391,7 @@ class KeepClient(object): def __init__(self, api_client=None, proxy=None, timeout=300, - api_token=None, local_store=None): + api_token=None, local_store=None, block_cache=None): """Initialize a new KeepClient. Arguments: @@ -362,15 +427,17 @@ class KeepClient(object): if local_store is None: local_store = os.environ.get('KEEP_LOCAL_STORE') + if block_cache is None: + raise Exception() + self.block_cache = block_cache if block_cache else KeepBlockCache() + if local_store: self.local_store = local_store self.get = self.local_store_get self.put = self.local_store_put else: self.timeout = timeout - self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB - self._cache = [] - self._cache_lock = threading.Lock() + if proxy: if not proxy.endswith('/'): proxy += '/' @@ -461,59 +528,6 @@ class KeepClient(object): _logger.debug(str(pseq)) return pseq - class CacheSlot(object): - def __init__(self, locator): - self.locator = locator - self.ready = threading.Event() - self.content = None - - def get(self): - self.ready.wait() - return self.content - - def set(self, value): - self.content = value - self.ready.set() - - def size(self): - if self.content == None: - return 0 - else: - return len(self.content) - - def cap_cache(self): - '''Cap the cache size to self.cache_max''' - self._cache_lock.acquire() - try: - self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache) - sm = sum([slot.size() for slot in self._cache]) - while sm > self.cache_max: - del self._cache[-1] - sm = sum([slot.size() for a in self._cache]) - finally: - self._cache_lock.release() - - def reserve_cache(self, locator): - '''Reserve a cache slot for the specified locator, - or return the existing slot.''' - self._cache_lock.acquire() - try: - # Test if the locator is already in the cache - for i in xrange(0, len(self._cache)): - if self._cache[i].locator == locator: - n = self._cache[i] - if i != 0: - # move it to the front - del self._cache[i] - self._cache.insert(0, n) - return n, False - - # Add a new cache slot for the locator - n = KeepClient.CacheSlot(locator) - self._cache.insert(0, n) - return n, True - finally: - self._cache_lock.release() def map_new_services(self, roots_map, md5_s, force_rebuild, **headers): # roots_map is a dictionary, mapping Keep service root strings @@ -569,7 +583,7 @@ class KeepClient(object): locator = KeepLocator(loc_s) expect_hash = locator.md5sum - slot, first = self.reserve_cache(expect_hash) + slot, first = self.block_cache.reserve_cache(expect_hash) if not first: v = slot.get() return v @@ -609,7 +623,7 @@ class KeepClient(object): # Always cache the result, then return it if we succeeded. slot.set(blob) - self.cap_cache() + self.block_cache.cap_cache() if loop.success(): return blob @@ -694,7 +708,6 @@ class KeepClient(object): os.rename(os.path.join(self.local_store, md5 + '.tmp'), os.path.join(self.local_store, md5)) return locator - def local_store_get(self, loc_s): try: locator = KeepLocator(loc_s) diff --git a/services/fuse/arvados_fuse/__init__.py b/services/fuse/arvados_fuse/__init__.py index 4d2dfee546..f49b94777b 100644 --- a/services/fuse/arvados_fuse/__init__.py +++ b/services/fuse/arvados_fuse/__init__.py @@ -34,12 +34,18 @@ class SafeApi(object): self.token = config.get('ARVADOS_API_TOKEN') self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE') self.local = threading.local() + self.block_cache = arvados.KeepBlockCache() def localapi(self): if 'api' not in self.local.__dict__: self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure) return self.local.api + def localkeep(self): + if 'keep' not in self.local.__dict__: + self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache) + return self.local.keep + def collections(self): return self.localapi().collections() @@ -307,7 +313,7 @@ class CollectionDirectory(Directory): self.collection_object_file.update(self.collection_object) self.clear() - collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api) + collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep()) for s in collection.all_streams(): cwd = self for part in s.name().split('/'): @@ -341,6 +347,10 @@ class CollectionDirectory(Directory): else: _logger.error("arv-mount %s: error", self.collection_locator) _logger.exception(detail) + except arvados.errors.ArgumentError as detail: + _logger.warning("arv-mount %s: error %s", self.collection_locator, detail) + if self.collection_object is not None and "manifest_text" in self.collection_object: + _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"]) except Exception as detail: _logger.error("arv-mount %s: error", self.collection_locator) if self.collection_object is not None and "manifest_text" in self.collection_object: @@ -796,7 +806,11 @@ class Operations(llfuse.Operations): try: with llfuse.lock_released: return handle.entry.readfrom(off, size) - except: + except arvados.errors.NotFoundError as e: + _logger.warning("Block not found: " + str(e)) + raise llfuse.FUSEError(errno.EIO) + except Exception as e: + _logger.exception(e) raise llfuse.FUSEError(errno.EIO) def release(self, fh): -- 2.30.2