def __init__(self):
self.lock = threading.Lock()
self.service_roots = None
+ self._cache_lock = threading.Lock()
+ self._cache = []
+ # default 256 megabyte cache
+ self._cache_max = 256 * 1024 * 1024
def shuffled_service_roots(self, hash):
if self.service_roots == None:
if 'KEEP_LOCAL_STORE' in os.environ:
return KeepClient.local_store_get(locator)
expect_hash = re.sub(r'\+.*', '', locator)
+
+ c = self.check_cache(expect_hash)
+ if c:
+ return c
+
for service_root in self.shuffled_service_roots(expect_hash):
- h = httplib2.Http()
url = service_root + expect_hash
api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token,
'Accept': 'application/octet-stream'}
- try:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- if re.match(r'^2\d\d$', resp['status']):
- m = hashlib.new('md5')
- m.update(content)
- md5 = m.hexdigest()
- if md5 == expect_hash:
- return content
- logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
- except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
- logging.info("Request fail: GET %s => %s: %s" %
- (url, type(e), str(e)))
+ blob = self.get_url(url, headers, expect_hash)
+ if blob:
+ self.put_cache(expect_hash, blob)
+ return blob
+
+ for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
+ instance = location_hint.group(1)
+ url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash
+ blob = self.get_url(url, {}, expect_hash)
+ if blob:
+ self.put_cache(expect_hash, blob)
+ return blob
raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+ def get_url(self, url, headers, expect_hash):
+ h = httplib2.Http()
+ try:
+ resp, content = h.request(url.encode('utf-8'), 'GET',
+ headers=headers)
+ if re.match(r'^2\d\d$', resp['status']):
+ m = hashlib.new('md5')
+ m.update(content)
+ md5 = m.hexdigest()
+ if md5 == expect_hash:
+ return content
+ logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
+ except Exception as e:
+ logging.info("Request fail: GET %s => %s: %s" %
+ (url, type(e), str(e)))
+ return None
+
def put(self, data, **kwargs):
if 'KEEP_LOCAL_STORE' in os.environ:
return KeepClient.local_store_put(data)
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, want_copies, have_copies))
+ def put_cache(self, locator, data):
+ """Put a block into the cache."""
+ if self.check_cache(locator) != None:
+ return
+ self.cache_lock.acquire()
+ try:
+ # first check cache size and delete stuff from the end if necessary
+ sm = sum([len(a[1]) for a in self._cache]) + len(data)
+ while sum > self._cache_max:
+ del self._cache[-1]
+ sm = sum([len(a[1]) for a in self._cache]) + len(data)
+
+ # now add the new block at the front of the list
+ self._cache.insert(0, [locator, data])
+ finally:
+ self.cache_lock.release()
+
+ def check_cache(self, locator):
+ """Get a block from the cache. Also moves the block to the front of the list."""
+ self._cache_lock.acquire()
+ try:
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i][0] == locator:
+ n = self._cache[i]
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n[1]
+ finally:
+ self.cache_lock.release()
+ return None
+
@staticmethod
def sign_for_old_server(data_hash, data):
return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)