global_client_object = None
-from arvados import *
+from api import *
+import config
+import arvados.errors
class Keep:
@staticmethod
self.args = kwargs
def run(self):
- global config
with self.args['thread_limiter'] as limiter:
if not limiter.shall_i_proceed():
# My turn arrived, but the job has been done without
self.args['service_root']))
h = httplib2.Http()
url = self.args['service_root'] + self.args['data_hash']
- api_token = config['ARVADOS_API_TOKEN']
+ api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token}
try:
resp, content = h.request(url.encode('utf-8'), 'PUT',
def __init__(self):
self.lock = threading.Lock()
self.service_roots = None
+ self._cache_lock = threading.Lock()
+ self._cache = []
+ # default 256 megabyte cache
+ self._cache_max = 256 * 1024 * 1024
def shuffled_service_roots(self, hash):
if self.service_roots == None:
return pseq
def get(self, locator):
- global config
if re.search(r',', locator):
return ''.join(self.get(x) for x in locator.split(','))
if 'KEEP_LOCAL_STORE' in os.environ:
return KeepClient.local_store_get(locator)
expect_hash = re.sub(r'\+.*', '', locator)
+
+ c = self.check_cache(expect_hash)
+ if c:
+ return c
+
for service_root in self.shuffled_service_roots(expect_hash):
- h = httplib2.Http()
url = service_root + expect_hash
- api_token = config['ARVADOS_API_TOKEN']
+ api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token,
'Accept': 'application/octet-stream'}
- try:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- if re.match(r'^2\d\d$', resp['status']):
- m = hashlib.new('md5')
- m.update(content)
- md5 = m.hexdigest()
- if md5 == expect_hash:
- return content
- logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
- except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
- logging.info("Request fail: GET %s => %s: %s" %
- (url, type(e), str(e)))
- raise errors.NotFoundError("Block not found: %s" % expect_hash)
+ blob = self.get_url(url, headers, expect_hash)
+ if blob:
+ self.put_cache(expect_hash, blob)
+ return blob
+
+ for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
+ instance = location_hint.group(1)
+ url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash
+ blob = self.get_url(url, {}, expect_hash)
+ if blob:
+ self.put_cache(expect_hash, blob)
+ return blob
+ raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+
+ def get_url(self, url, headers, expect_hash):
+ h = httplib2.Http()
+ try:
+ resp, content = h.request(url.encode('utf-8'), 'GET',
+ headers=headers)
+ if re.match(r'^2\d\d$', resp['status']):
+ m = hashlib.new('md5')
+ m.update(content)
+ md5 = m.hexdigest()
+ if md5 == expect_hash:
+ return content
+ logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
+ except Exception as e:
+ logging.info("Request fail: GET %s => %s: %s" %
+ (url, type(e), str(e)))
+ return None
def put(self, data, **kwargs):
if 'KEEP_LOCAL_STORE' in os.environ:
have_copies = thread_limiter.done()
if have_copies == want_copies:
return (data_hash + '+' + str(len(data)))
- raise errors.KeepWriteError(
+ raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, want_copies, have_copies))
+ def put_cache(self, locator, data):
+ """Put a block into the cache."""
+ if self.check_cache(locator) != None:
+ return
+ self.cache_lock.acquire()
+ try:
+ # first check cache size and delete stuff from the end if necessary
+ sm = sum([len(a[1]) for a in self._cache]) + len(data)
+ while sum > self._cache_max:
+ del self._cache[-1]
+ sm = sum([len(a[1]) for a in self._cache]) + len(data)
+
+ # now add the new block at the front of the list
+ self._cache.insert(0, [locator, data])
+ finally:
+ self.cache_lock.release()
+
+ def check_cache(self, locator):
+ """Get a block from the cache. Also moves the block to the front of the list."""
+ self._cache_lock.acquire()
+ try:
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i][0] == locator:
+ n = self._cache[i]
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n[1]
+ finally:
+ self.cache_lock.release()
+ return None
+
@staticmethod
def sign_for_old_server(data_hash, data):
return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
def local_store_get(locator):
r = re.search('^([0-9a-f]{32,})', locator)
if not r:
- raise errors.NotFoundError(
+ raise arvados.errors.NotFoundError(
"Invalid data locator: '%s'" % locator)
- if r.group(0) == EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ if r.group(0) == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
return ''
with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
return f.read()