X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8191d7365a61f4d4309c2f0b387188303320a889..2024ca087c3b9c99ebb792011b60fecdf1486467:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 4c2d474010..b5d5d72503 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -19,16 +19,18 @@ import time import threading import timer import datetime +import ssl +_logger = logging.getLogger('arvados.keep') global_client_object = None from api import * import config import arvados.errors +import arvados.util class KeepLocator(object): EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0) - HEX_RE = re.compile(r'^[0-9a-fA-F]+$') def __init__(self, locator_str): self.size = None @@ -53,13 +55,6 @@ class KeepLocator(object): self.permission_hint()] if s is not None) - def _is_hex_length(self, s, *size_spec): - if len(size_spec) == 1: - good_len = (len(s) == size_spec[0]) - else: - good_len = (size_spec[0] <= len(s) <= size_spec[1]) - return good_len and self.HEX_RE.match(s) - def _make_hex_prop(name, length): # Build and return a new property with the given name that # must be a hex string of the given length. @@ -67,7 +62,7 @@ class KeepLocator(object): def getter(self): return getattr(self, data_name) def setter(self, hex_str): - if not self._is_hex_length(hex_str, length): + if not arvados.util.is_hex(hex_str, length): raise ValueError("{} must be a {}-digit hex string: {}". format(name, length, hex_str)) setattr(self, data_name, hex_str) @@ -82,7 +77,7 @@ class KeepLocator(object): @perm_expiry.setter def perm_expiry(self, value): - if not self._is_hex_length(value, 1, 8): + if not arvados.util.is_hex(value, 1, 8): raise ValueError( "permission timestamp must be a hex Unix timestamp: {}". format(value)) @@ -192,6 +187,10 @@ class KeepClient(object): def __init__(self, **kwargs): super(KeepClient.KeepWriterThread, self).__init__() self.args = kwargs + self._success = False + + def success(self): + return self._success def run(self): with self.args['thread_limiter'] as limiter: @@ -199,58 +198,66 @@ class KeepClient(object): # My turn arrived, but the job has been done without # me. return - logging.debug("KeepWriterThread %s proceeding %s %s" % - (str(threading.current_thread()), - self.args['data_hash'], - self.args['service_root'])) - h = httplib2.Http() - url = self.args['service_root'] + self.args['data_hash'] - api_token = config.get('ARVADOS_API_TOKEN') - headers = {'Authorization': "OAuth2 %s" % api_token} - - if self.args['using_proxy']: - # We're using a proxy, so tell the proxy how many copies we - # want it to store - headers['X-Keep-Desired-Replication'] = str(self.args['want_copies']) - - try: - logging.debug("Uploading to {}".format(url)) + self.run_with_limiter(limiter) + + def run_with_limiter(self, limiter): + _logger.debug("KeepWriterThread %s proceeding %s %s", + str(threading.current_thread()), + self.args['data_hash'], + self.args['service_root']) + h = httplib2.Http(timeout=self.args.get('timeout', None)) + url = self.args['service_root'] + self.args['data_hash'] + api_token = config.get('ARVADOS_API_TOKEN') + headers = {'Authorization': "OAuth2 %s" % api_token} + + if self.args['using_proxy']: + # We're using a proxy, so tell the proxy how many copies we + # want it to store + headers['X-Keep-Desired-Replication'] = str(self.args['want_copies']) + + try: + _logger.debug("Uploading to {}".format(url)) + resp, content = h.request(url.encode('utf-8'), 'PUT', + headers=headers, + body=self.args['data']) + if (resp['status'] == '401' and + re.match(r'Timestamp verification failed', content)): + body = KeepClient.sign_for_old_server( + self.args['data_hash'], + self.args['data']) + h = httplib2.Http(timeout=self.args.get('timeout', None)) resp, content = h.request(url.encode('utf-8'), 'PUT', headers=headers, - body=self.args['data']) - if (resp['status'] == '401' and - re.match(r'Timestamp verification failed', content)): - body = KeepClient.sign_for_old_server( - self.args['data_hash'], - self.args['data']) - h = httplib2.Http() - resp, content = h.request(url.encode('utf-8'), 'PUT', - headers=headers, - body=body) - if re.match(r'^2\d\d$', resp['status']): - logging.debug("KeepWriterThread %s succeeded %s %s" % - (str(threading.current_thread()), - self.args['data_hash'], - self.args['service_root'])) - replicas_stored = 1 - if 'x-keep-replicas-stored' in resp: - # Tick the 'done' counter for the number of replica - # reported stored by the server, for the case that - # we're talking to a proxy or other backend that - # stores to multiple copies for us. - try: - replicas_stored = int(resp['x-keep-replicas-stored']) - except ValueError: - pass - return limiter.save_response(content.strip(), replicas_stored) - - logging.warning("Request fail: PUT %s => %s %s" % - (url, resp['status'], content)) - except (httplib2.HttpLib2Error, httplib.HTTPException) as e: - logging.warning("Request fail: PUT %s => %s: %s" % - (url, type(e), str(e))) - - def __init__(self): + body=body) + if re.match(r'^2\d\d$', resp['status']): + self._success = True + _logger.debug("KeepWriterThread %s succeeded %s %s", + str(threading.current_thread()), + self.args['data_hash'], + self.args['service_root']) + replicas_stored = 1 + if 'x-keep-replicas-stored' in resp: + # Tick the 'done' counter for the number of replica + # reported stored by the server, for the case that + # we're talking to a proxy or other backend that + # stores to multiple copies for us. + try: + replicas_stored = int(resp['x-keep-replicas-stored']) + except ValueError: + pass + limiter.save_response(content.strip(), replicas_stored) + else: + _logger.warning("Request fail: PUT %s => %s %s", + url, resp['status'], content) + except (httplib2.HttpLib2Error, + httplib.HTTPException, + ssl.SSLError) as e: + # When using https, timeouts look like ssl.SSLError from here. + # "SSLError: The write operation timed out" + _logger.warning("Request fail: PUT %s => %s: %s", + url, type(e), str(e)) + + def __init__(self, **kwargs): self.lock = threading.Lock() self.service_roots = None self._cache_lock = threading.Lock() @@ -258,6 +265,7 @@ class KeepClient(object): # default 256 megabyte cache self.cache_max = 256 * 1024 * 1024 self.using_proxy = False + self.timeout = kwargs.get('timeout', 60) def shuffled_service_roots(self, hash): if self.service_roots == None: @@ -291,7 +299,7 @@ class KeepClient(object): f['service_port'])) for f in keep_services) self.service_roots = sorted(set(roots)) - logging.debug(str(self.service_roots)) + _logger.debug(str(self.service_roots)) finally: self.lock.release() @@ -336,7 +344,7 @@ class KeepClient(object): # Remove the digits just used from the seed seed = seed[8:] - logging.debug(str(pseq)) + _logger.debug(str(pseq)) return pseq class CacheSlot(object): @@ -394,8 +402,6 @@ class KeepClient(object): self._cache_lock.release() def get(self, locator): - #logging.debug("Keep.get %s" % (locator)) - if re.search(r',', locator): return ''.join(self.get(x) for x in locator.split(',')) if 'KEEP_LOCAL_STORE' in os.environ: @@ -403,7 +409,6 @@ class KeepClient(object): expect_hash = re.sub(r'\+.*', '', locator) slot, first = self.reserve_cache(expect_hash) - #logging.debug("%s %s %s" % (slot, first, expect_hash)) if not first: v = slot.get() @@ -441,23 +446,23 @@ class KeepClient(object): def get_url(self, url, headers, expect_hash): h = httplib2.Http() try: - logging.info("Request: GET %s" % (url)) + _logger.info("Request: GET %s", url) with timer.Timer() as t: resp, content = h.request(url.encode('utf-8'), 'GET', headers=headers) - logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content), - t.msecs, - (len(content)/(1024*1024))/t.secs)) + _logger.info("Received %s bytes in %s msec (%s MiB/sec)", + len(content), t.msecs, + (len(content)/(1024*1024))/t.secs) if re.match(r'^2\d\d$', resp['status']): m = hashlib.new('md5') m.update(content) md5 = m.hexdigest() if md5 == expect_hash: return content - logging.warning("Checksum fail: md5(%s) = %s" % (url, md5)) + _logger.warning("Checksum fail: md5(%s) = %s", url, md5) except Exception as e: - logging.info("Request fail: GET %s => %s: %s" % - (url, type(e), str(e))) + _logger.info("Request fail: GET %s => %s: %s", + url, type(e), str(e)) return None def put(self, data, **kwargs): @@ -473,16 +478,33 @@ class KeepClient(object): threads = [] thread_limiter = KeepClient.ThreadLimiter(want_copies) for service_root in self.shuffled_service_roots(data_hash): - t = KeepClient.KeepWriterThread(data=data, - data_hash=data_hash, - service_root=service_root, - thread_limiter=thread_limiter, - using_proxy=self.using_proxy, - want_copies=(want_copies if self.using_proxy else 1)) + t = KeepClient.KeepWriterThread( + data=data, + data_hash=data_hash, + service_root=service_root, + thread_limiter=thread_limiter, + timeout=self.timeout, + using_proxy=self.using_proxy, + want_copies=(want_copies if self.using_proxy else 1)) t.start() threads += [t] for t in threads: t.join() + if thread_limiter.done() < want_copies: + # Retry the threads (i.e., services) that failed the first + # time around. + threads_retry = [] + for t in threads: + if not t.success(): + _logger.warning("Retrying: PUT %s %s", + t.args['service_root'], + t.args['data_hash']) + retry_with_args = t.args.copy() + t_retry = KeepClient.KeepWriterThread(**retry_with_args) + t_retry.start() + threads_retry += [t_retry] + for t in threads_retry: + t.join() have_copies = thread_limiter.done() # If we're done, return the response from Keep if have_copies >= want_copies: