global_client_object = None
-from arvados import *
+from api import *
+import config
+import arvados.errors
class Keep:
@staticmethod
self.args = kwargs
def run(self):
- global config
with self.args['thread_limiter'] as limiter:
if not limiter.shall_i_proceed():
# My turn arrived, but the job has been done without
self.args['service_root']))
h = httplib2.Http()
url = self.args['service_root'] + self.args['data_hash']
- api_token = config['ARVADOS_API_TOKEN']
+ api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token}
try:
resp, content = h.request(url.encode('utf-8'), 'PUT',
def shuffled_service_roots(self, hash):
if self.service_roots == None:
self.lock.acquire()
- keep_disks = api().keep_disks().list().execute()['items']
+ keep_disks = arvados.api().keep_disks().list().execute()['items']
roots = (("http%s://%s:%d/" %
('s' if f['service_ssl_flag'] else '',
f['service_host'],
return pseq
def get(self, locator):
- global config
if re.search(r',', locator):
return ''.join(self.get(x) for x in locator.split(','))
if 'KEEP_LOCAL_STORE' in os.environ:
return KeepClient.local_store_get(locator)
expect_hash = re.sub(r'\+.*', '', locator)
for service_root in self.shuffled_service_roots(expect_hash):
- h = httplib2.Http()
url = service_root + expect_hash
- api_token = config['ARVADOS_API_TOKEN']
+ api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token,
'Accept': 'application/octet-stream'}
- try:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- if re.match(r'^2\d\d$', resp['status']):
- m = hashlib.new('md5')
- m.update(content)
- md5 = m.hexdigest()
- if md5 == expect_hash:
- return content
- logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
- except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
- logging.info("Request fail: GET %s => %s: %s" %
- (url, type(e), str(e)))
- raise errors.NotFoundError("Block not found: %s" % expect_hash)
+ blob = self.get_url(url, headers, expect_hash)
+ if blob:
+ return blob
+ for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
+ instance = location_hint.group(1)
+ url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash
+ blob = self.get_url(url, {}, expect_hash)
+ if blob:
+ return blob
+ raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
+
+ def get_url(self, url, headers, expect_hash):
+ h = httplib2.Http()
+ try:
+ resp, content = h.request(url.encode('utf-8'), 'GET',
+ headers=headers)
+ if re.match(r'^2\d\d$', resp['status']):
+ m = hashlib.new('md5')
+ m.update(content)
+ md5 = m.hexdigest()
+ if md5 == expect_hash:
+ return content
+ logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
+ except Exception as e:
+ logging.info("Request fail: GET %s => %s: %s" %
+ (url, type(e), str(e)))
+ return None
def put(self, data, **kwargs):
if 'KEEP_LOCAL_STORE' in os.environ:
have_copies = thread_limiter.done()
if have_copies == want_copies:
return (data_hash + '+' + str(len(data)))
- raise errors.KeepWriteError(
+ raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %
(data_hash, want_copies, have_copies))
def local_store_get(locator):
r = re.search('^([0-9a-f]{32,})', locator)
if not r:
- raise errors.NotFoundError(
+ raise arvados.errors.NotFoundError(
"Invalid data locator: '%s'" % locator)
- if r.group(0) == EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ if r.group(0) == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
return ''
with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
return f.read()