import threading
import timer
import datetime
+import ssl
+_logger = logging.getLogger('arvados.keep')
global_client_object = None
from api import *
import config
import arvados.errors
+import arvados.util
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
- HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
def __init__(self, locator_str):
self.size = None
self.permission_hint()]
if s is not None)
- def _is_hex_length(self, s, *size_spec):
- if len(size_spec) == 1:
- good_len = (len(s) == size_spec[0])
- else:
- good_len = (size_spec[0] <= len(s) <= size_spec[1])
- return good_len and self.HEX_RE.match(s)
-
def _make_hex_prop(name, length):
# Build and return a new property with the given name that
# must be a hex string of the given length.
def getter(self):
return getattr(self, data_name)
def setter(self, hex_str):
- if not self._is_hex_length(hex_str, length):
+ if not arvados.util.is_hex(hex_str, length):
raise ValueError("{} must be a {}-digit hex string: {}".
format(name, length, hex_str))
setattr(self, data_name, hex_str)
@perm_expiry.setter
def perm_expiry(self, value):
- if not self._is_hex_length(value, 1, 8):
+ if not arvados.util.is_hex(value, 1, 8):
raise ValueError(
"permission timestamp must be a hex Unix timestamp: {}".
format(value))
def __init__(self, **kwargs):
super(KeepClient.KeepWriterThread, self).__init__()
self.args = kwargs
+ self._success = False
+
+ def success(self):
+ return self._success
def run(self):
with self.args['thread_limiter'] as limiter:
# My turn arrived, but the job has been done without
# me.
return
- logging.debug("KeepWriterThread %s proceeding %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
- h = httplib2.Http()
- url = self.args['service_root'] + self.args['data_hash']
- api_token = config.get('ARVADOS_API_TOKEN')
- headers = {'Authorization': "OAuth2 %s" % api_token}
-
- if self.args['using_proxy']:
- # We're using a proxy, so tell the proxy how many copies we
- # want it to store
- headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
-
- try:
- logging.debug("Uploading to {}".format(url))
+ self.run_with_limiter(limiter)
+
+ def run_with_limiter(self, limiter):
+ _logger.debug("KeepWriterThread %s proceeding %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
+ h = httplib2.Http(timeout=self.args.get('timeout', None))
+ url = self.args['service_root'] + self.args['data_hash']
+ api_token = config.get('ARVADOS_API_TOKEN')
+ headers = {'Authorization': "OAuth2 %s" % api_token}
+
+ if self.args['using_proxy']:
+ # We're using a proxy, so tell the proxy how many copies we
+ # want it to store
+ headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
+ try:
+ _logger.debug("Uploading to {}".format(url))
+ resp, content = h.request(url.encode('utf-8'), 'PUT',
+ headers=headers,
+ body=self.args['data'])
+ if (resp['status'] == '401' and
+ re.match(r'Timestamp verification failed', content)):
+ body = KeepClient.sign_for_old_server(
+ self.args['data_hash'],
+ self.args['data'])
+ h = httplib2.Http(timeout=self.args.get('timeout', None))
resp, content = h.request(url.encode('utf-8'), 'PUT',
headers=headers,
- body=self.args['data'])
- if (resp['status'] == '401' and
- re.match(r'Timestamp verification failed', content)):
- body = KeepClient.sign_for_old_server(
- self.args['data_hash'],
- self.args['data'])
- h = httplib2.Http()
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=body)
- if re.match(r'^2\d\d$', resp['status']):
- logging.debug("KeepWriterThread %s succeeded %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
- replicas_stored = 1
- if 'x-keep-replicas-stored' in resp:
- # Tick the 'done' counter for the number of replica
- # reported stored by the server, for the case that
- # we're talking to a proxy or other backend that
- # stores to multiple copies for us.
- try:
- replicas_stored = int(resp['x-keep-replicas-stored'])
- except ValueError:
- pass
- return limiter.save_response(content.strip(), replicas_stored)
-
- logging.warning("Request fail: PUT %s => %s %s" %
- (url, resp['status'], content))
- except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
- logging.warning("Request fail: PUT %s => %s: %s" %
- (url, type(e), str(e)))
-
- def __init__(self):
+ body=body)
+ if re.match(r'^2\d\d$', resp['status']):
+ self._success = True
+ _logger.debug("KeepWriterThread %s succeeded %s %s",
+ str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root'])
+ replicas_stored = 1
+ if 'x-keep-replicas-stored' in resp:
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(resp['x-keep-replicas-stored'])
+ except ValueError:
+ pass
+ limiter.save_response(content.strip(), replicas_stored)
+ else:
+ _logger.warning("Request fail: PUT %s => %s %s",
+ url, resp['status'], content)
+ except (httplib2.HttpLib2Error,
+ httplib.HTTPException,
+ ssl.SSLError) as e:
+ # When using https, timeouts look like ssl.SSLError from here.
+ # "SSLError: The write operation timed out"
+ _logger.warning("Request fail: PUT %s => %s: %s",
+ url, type(e), str(e))
+
+ def __init__(self, **kwargs):
self.lock = threading.Lock()
self.service_roots = None
self._cache_lock = threading.Lock()
# default 256 megabyte cache
self.cache_max = 256 * 1024 * 1024
self.using_proxy = False
+ self.timeout = kwargs.get('timeout', 60)
def shuffled_service_roots(self, hash):
if self.service_roots == None:
f['service_port']))
for f in keep_services)
self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
+ _logger.debug(str(self.service_roots))
finally:
self.lock.release()
# Remove the digits just used from the seed
seed = seed[8:]
- logging.debug(str(pseq))
+ _logger.debug(str(pseq))
return pseq
class CacheSlot(object):
self._cache_lock.release()
def get(self, locator):
- #logging.debug("Keep.get %s" % (locator))
-
if re.search(r',', locator):
return ''.join(self.get(x) for x in locator.split(','))
if 'KEEP_LOCAL_STORE' in os.environ:
expect_hash = re.sub(r'\+.*', '', locator)
slot, first = self.reserve_cache(expect_hash)
- #logging.debug("%s %s %s" % (slot, first, expect_hash))
if not first:
v = slot.get()
def get_url(self, url, headers, expect_hash):
h = httplib2.Http()
try:
- logging.info("Request: GET %s" % (url))
+ _logger.info("Request: GET %s", url)
with timer.Timer() as t:
resp, content = h.request(url.encode('utf-8'), 'GET',
headers=headers)
- logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
- t.msecs,
- (len(content)/(1024*1024))/t.secs))
+ _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
+ len(content), t.msecs,
+ (len(content)/(1024*1024))/t.secs)
if re.match(r'^2\d\d$', resp['status']):
m = hashlib.new('md5')
m.update(content)
md5 = m.hexdigest()
if md5 == expect_hash:
return content
- logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
+ _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
except Exception as e:
- logging.info("Request fail: GET %s => %s: %s" %
- (url, type(e), str(e)))
+ _logger.info("Request fail: GET %s => %s: %s",
+ url, type(e), str(e))
return None
def put(self, data, **kwargs):
threads = []
thread_limiter = KeepClient.ThreadLimiter(want_copies)
for service_root in self.shuffled_service_roots(data_hash):
- t = KeepClient.KeepWriterThread(data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter,
- using_proxy=self.using_proxy,
- want_copies=(want_copies if self.using_proxy else 1))
+ t = KeepClient.KeepWriterThread(
+ data=data,
+ data_hash=data_hash,
+ service_root=service_root,
+ thread_limiter=thread_limiter,
+ timeout=self.timeout,
+ using_proxy=self.using_proxy,
+ want_copies=(want_copies if self.using_proxy else 1))
t.start()
threads += [t]
for t in threads:
t.join()
+ if thread_limiter.done() < want_copies:
+ # Retry the threads (i.e., services) that failed the first
+ # time around.
+ threads_retry = []
+ for t in threads:
+ if not t.success():
+ _logger.warning("Retrying: PUT %s %s",
+ t.args['service_root'],
+ t.args['data_hash'])
+ retry_with_args = t.args.copy()
+ t_retry = KeepClient.KeepWriterThread(**retry_with_args)
+ t_retry.start()
+ threads_retry += [t_retry]
+ for t in threads_retry:
+ t.join()
have_copies = thread_limiter.done()
# If we're done, return the response from Keep
if have_copies >= want_copies: