import datetime
import ssl
+_logger = logging.getLogger('arvados.keep')
global_client_object = None
from api import *
self.run_with_limiter(limiter)
def run_with_limiter(self, limiter):
- logging.debug("KeepWriterThread %s proceeding %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
+ _logger.debug("KeepWriterThread %s proceeding %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
h = httplib2.Http(timeout=self.args.get('timeout', None))
url = self.args['service_root'] + self.args['data_hash']
api_token = config.get('ARVADOS_API_TOKEN')
headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
try:
- logging.debug("Uploading to {}".format(url))
+ _logger.debug("Uploading to {}".format(url))
resp, content = h.request(url.encode('utf-8'), 'PUT',
headers=headers,
body=self.args['data'])
body=body)
if re.match(r'^2\d\d$', resp['status']):
self._success = True
- logging.debug("KeepWriterThread %s succeeded %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
+ _logger.debug("KeepWriterThread %s succeeded %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
replicas_stored = 1
if 'x-keep-replicas-stored' in resp:
# Tick the 'done' counter for the number of replica
pass
limiter.save_response(content.strip(), replicas_stored)
else:
- logging.warning("Request fail: PUT %s => %s %s" %
- (url, resp['status'], content))
+ _logger.warning("Request fail: PUT %s => %s %s",
+ url, resp['status'], content)
except (httplib2.HttpLib2Error,
httplib.HTTPException,
ssl.SSLError) as e:
# When using https, timeouts look like ssl.SSLError from here.
# "SSLError: The write operation timed out"
- logging.warning("Request fail: PUT %s => %s: %s" %
- (url, type(e), str(e)))
+ _logger.warning("Request fail: PUT %s => %s: %s",
+ url, type(e), str(e))
def __init__(self, **kwargs):
self.lock = threading.Lock()
f['service_port']))
for f in keep_services)
self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
+ _logger.debug(str(self.service_roots))
finally:
self.lock.release()
# Remove the digits just used from the seed
seed = seed[8:]
- logging.debug(str(pseq))
+ _logger.debug(str(pseq))
return pseq
class CacheSlot(object):
self._cache_lock.release()
def get(self, locator):
- #logging.debug("Keep.get %s" % (locator))
-
if re.search(r',', locator):
return ''.join(self.get(x) for x in locator.split(','))
if 'KEEP_LOCAL_STORE' in os.environ:
expect_hash = re.sub(r'\+.*', '', locator)
slot, first = self.reserve_cache(expect_hash)
- #logging.debug("%s %s %s" % (slot, first, expect_hash))
if not first:
v = slot.get()
def get_url(self, url, headers, expect_hash):
h = httplib2.Http()
try:
- logging.info("Request: GET %s" % (url))
+ _logger.info("Request: GET %s", url)
with timer.Timer() as t:
resp, content = h.request(url.encode('utf-8'), 'GET',
headers=headers)
- logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
- t.msecs,
- (len(content)/(1024*1024))/t.secs))
+ _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
+ len(content), t.msecs,
+ (len(content)/(1024*1024))/t.secs)
if re.match(r'^2\d\d$', resp['status']):
m = hashlib.new('md5')
m.update(content)
md5 = m.hexdigest()
if md5 == expect_hash:
return content
- logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
+ _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
except Exception as e:
- logging.info("Request fail: GET %s => %s: %s" %
- (url, type(e), str(e)))
+ _logger.info("Request fail: GET %s => %s: %s",
+ url, type(e), str(e))
return None
def put(self, data, **kwargs):
threads_retry = []
for t in threads:
if not t.success():
- logging.warning("Retrying: PUT %s %s" % (
- t.args['service_root'],
- t.args['data_hash']))
+ _logger.warning("Retrying: PUT %s %s",
+ t.args['service_root'],
+ t.args['data_hash'])
retry_with_args = t.args.copy()
t_retry = KeepClient.KeepWriterThread(**retry_with_args)
t_retry.start()