From: Peter Amstutz Date: Thu, 20 Feb 2014 03:15:44 +0000 (+0000) Subject: Fixed caching so that reads by other threads for a block which is already X-Git-Tag: 1.1.0~2736^2~10 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/426b8ae13589e4ee121a094a6f18dd89a577402d Fixed caching so that reads by other threads for a block which is already being downloaded will block all threads until the block is downloaded. --- diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 93abfacdc9..f68b11cd4a 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -216,19 +216,20 @@ class KeepClient(object): if self._cache[i].locator == locator: n = self._cache[i] if i != 0: + # move it to the front del self._cache[i] self._cache.insert(0, n) return n, False # Add a new cache slot for the locator - n = CacheSlot(locator) + n = KeepClient.CacheSlot(locator) self._cache.insert(0, n) return n, True finally: self._cache_lock.release() def get(self, locator): - logging.debug("Keep.get %s" % (locator)) + #logging.debug("Keep.get %s" % (locator)) if re.search(r',', locator): return ''.join(self.get(x) for x in locator.split(',')) @@ -237,33 +238,39 @@ class KeepClient(object): expect_hash = re.sub(r'\+.*', '', locator) slot, first = self.reserve_cache(expect_hash) + #logging.debug("%s %s %s" % (slot, first, expect_hash)) + if not first: v = slot.get() return v - for service_root in self.shuffled_service_roots(expect_hash): - url = service_root + expect_hash - api_token = config.get('ARVADOS_API_TOKEN') - headers = {'Authorization': "OAuth2 %s" % api_token, - 'Accept': 'application/octet-stream'} - blob = self.get_url(url, headers, expect_hash) - if blob: - slot.set(blob) - self.cap_cache() - return blob - - for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator): - instance = location_hint.group(1) - url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash - blob = self.get_url(url, {}, expect_hash) - if blob: - slot.set(blob) - self.cap_cache() - return blob + try: + for service_root in self.shuffled_service_roots(expect_hash): + url = service_root + expect_hash + api_token = config.get('ARVADOS_API_TOKEN') + headers = {'Authorization': "OAuth2 %s" % api_token, + 'Accept': 'application/octet-stream'} + blob = self.get_url(url, headers, expect_hash) + if blob: + slot.set(blob) + self.cap_cache() + return blob + + for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator): + instance = location_hint.group(1) + url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash + blob = self.get_url(url, {}, expect_hash) + if blob: + slot.set(blob) + self.cap_cache() + return blob + except: + slot.set(None) + self.cap_cache() + raise slot.set(None) self.cap_cache() - raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash) def get_url(self, url, headers, expect_hash): @@ -273,7 +280,9 @@ class KeepClient(object): with timer.Timer() as t: resp, content = h.request(url.encode('utf-8'), 'GET', headers=headers) - logging.info("Received %s bytes in %s msec (%s bytes/sec)" % (len(content), t.msecs, len(content)/t.secs)) + logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content), + t.msecs, + (len(content)/(1024*1024))/t.secs)) if re.match(r'^2\d\d$', resp['status']): m = hashlib.new('md5') m.update(content) diff --git a/sdk/python/arvados/timer.py b/sdk/python/arvados/timer.py new file mode 100644 index 0000000000..739d0d59c0 --- /dev/null +++ b/sdk/python/arvados/timer.py @@ -0,0 +1,16 @@ +import time + +class Timer(object): + def __init__(self, verbose=False): + self.verbose = verbose + + def __enter__(self): + self.start = time.time() + return self + + def __exit__(self, *args): + self.end = time.time() + self.secs = self.end - self.start + self.msecs = self.secs * 1000 # millisecs + if self.verbose: + print 'elapsed time: %f ms' % self.msecs