X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c33d036cfef0b0784d16593e66e3f4fce018f783..1795bd4ba883c85c8277848f4e1f7a4cf9983ec8:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 2b718d7a4d..1822af19c6 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -2,6 +2,7 @@ import cStringIO import datetime import hashlib import logging +import math import os import pycurl import Queue @@ -209,31 +210,54 @@ class KeepBlockCache(object): self._cache.insert(0, n) return n, True + +class Counter(object): + def __init__(self, v=0): + self._lk = threading.Lock() + self._val = v + + def add(self, v): + with self._lk: + self._val += v + + def get(self): + with self._lk: + return self._val + + class KeepClient(object): # Default Keep server connection timeout: 2 seconds - # Default Keep server read timeout: 300 seconds + # Default Keep server read timeout: 64 seconds + # Default Keep server bandwidth minimum: 32768 bytes per second # Default Keep proxy connection timeout: 20 seconds - # Default Keep proxy read timeout: 300 seconds - DEFAULT_TIMEOUT = (2, 300) - DEFAULT_PROXY_TIMEOUT = (20, 300) + # Default Keep proxy read timeout: 64 seconds + # Default Keep proxy bandwidth minimum: 32768 bytes per second + DEFAULT_TIMEOUT = (2, 64, 32768) + DEFAULT_PROXY_TIMEOUT = (20, 64, 32768) class ThreadLimiter(object): - """ - Limit the number of threads running at a given time to - {desired successes} minus {successes reported}. When successes - reported == desired, wake up the remaining threads and tell - them to quit. + """Limit the number of threads writing to Keep at once. + + This ensures that only a number of writer threads that could + potentially achieve the desired replication level run at once. + Once the desired replication level is achieved, queued threads + are instructed not to run. Should be used in a "with" block. """ - def __init__(self, todo): + def __init__(self, want_copies, max_service_replicas): self._started = 0 - self._todo = todo + self._want_copies = want_copies self._done = 0 self._response = None self._start_lock = threading.Condition() - self._todo_lock = threading.Semaphore(todo) + if (not max_service_replicas) or (max_service_replicas >= want_copies): + max_threads = 1 + else: + max_threads = math.ceil(float(want_copies) / max_service_replicas) + _logger.debug("Limiter max threads is %d", max_threads) + self._todo_lock = threading.Semaphore(max_threads) self._done_lock = threading.Lock() self._local = threading.local() @@ -258,38 +282,31 @@ class KeepClient(object): def shall_i_proceed(self): """ - Return true if the current thread should do stuff. Return - false if the current thread should just stop. + Return true if the current thread should write to Keep. + Return false otherwise. """ with self._done_lock: - return (self._done < self._todo) + return (self._done < self._want_copies) def save_response(self, response_body, replicas_stored): """ Records a response body (a locator, possibly signed) returned by - the Keep server. It is not necessary to save more than - one response, since we presume that any locator returned - in response to a successful request is valid. + the Keep server, and the number of replicas it stored. """ with self._done_lock: self._done += replicas_stored self._response = response_body def response(self): - """ - Returns the body from the response to a PUT request. - """ + """Return the body from the response to a PUT request.""" with self._done_lock: return self._response def done(self): - """ - Return how many successes were reported. - """ + """Return the total number of replicas successfully stored.""" with self._done_lock: return self._done - class KeepService(object): """Make requests to a single Keep service, and track results. @@ -308,7 +325,9 @@ class KeepClient(object): arvados.errors.HttpError, ) - def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers): + def __init__(self, root, user_agent_pool=Queue.LifoQueue(), + upload_counter=None, + download_counter=None, **headers): self.root = root self._user_agent_pool = user_agent_pool self._result = {'error': None} @@ -317,6 +336,8 @@ class KeepClient(object): self.get_headers = {'Accept': 'application/octet-stream'} self.get_headers.update(headers) self.put_headers = headers + self.upload_counter = upload_counter + self.download_counter = download_counter def usable(self): """Is it worth attempting a request?""" @@ -402,11 +423,13 @@ class KeepClient(object): _logger.debug("Request fail: GET %s => %s: %s", url, type(self._result['error']), str(self._result['error'])) return None - _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)", + _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)", self._result['status_code'], len(self._result['body']), t.msecs, (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0) + if self.download_counter: + self.download_counter.add(len(self._result['body'])) resp_md5 = hashlib.md5(self._result['body']).hexdigest() if resp_md5 != locator.md5sum: _logger.warning("Checksum fail: md5(%s) = %s", @@ -421,36 +444,37 @@ class KeepClient(object): _logger.debug("Request: PUT %s", url) curl = self._get_user_agent() try: - self._headers = {} - body_reader = cStringIO.StringIO(body) - response_body = cStringIO.StringIO() - curl.setopt(pycurl.NOSIGNAL, 1) - curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open) - curl.setopt(pycurl.URL, url.encode('utf-8')) - # Using UPLOAD tells cURL to wait for a "go ahead" from the - # Keep server (in the form of a HTTP/1.1 "100 Continue" - # response) instead of sending the request body immediately. - # This allows the server to reject the request if the request - # is invalid or the server is read-only, without waiting for - # the client to send the entire block. - curl.setopt(pycurl.UPLOAD, True) - curl.setopt(pycurl.INFILESIZE, len(body)) - curl.setopt(pycurl.READFUNCTION, body_reader.read) - curl.setopt(pycurl.HTTPHEADER, [ - '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()]) - curl.setopt(pycurl.WRITEFUNCTION, response_body.write) - curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) - self._setcurltimeouts(curl, timeout) - try: - curl.perform() - except Exception as e: - raise arvados.errors.HttpError(0, str(e)) - self._result = { - 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), - 'body': response_body.getvalue(), - 'headers': self._headers, - 'error': False, - } + with timer.Timer() as t: + self._headers = {} + body_reader = cStringIO.StringIO(body) + response_body = cStringIO.StringIO() + curl.setopt(pycurl.NOSIGNAL, 1) + curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open) + curl.setopt(pycurl.URL, url.encode('utf-8')) + # Using UPLOAD tells cURL to wait for a "go ahead" from the + # Keep server (in the form of a HTTP/1.1 "100 Continue" + # response) instead of sending the request body immediately. + # This allows the server to reject the request if the request + # is invalid or the server is read-only, without waiting for + # the client to send the entire block. + curl.setopt(pycurl.UPLOAD, True) + curl.setopt(pycurl.INFILESIZE, len(body)) + curl.setopt(pycurl.READFUNCTION, body_reader.read) + curl.setopt(pycurl.HTTPHEADER, [ + '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()]) + curl.setopt(pycurl.WRITEFUNCTION, response_body.write) + curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) + self._setcurltimeouts(curl, timeout) + try: + curl.perform() + except Exception as e: + raise arvados.errors.HttpError(0, str(e)) + self._result = { + 'status_code': curl.getinfo(pycurl.RESPONSE_CODE), + 'body': response_body.getvalue(), + 'headers': self._headers, + 'error': False, + } ok = retry.check_http_response_success(self._result['status_code']) if not ok: self._result['error'] = arvados.errors.HttpError( @@ -471,17 +495,30 @@ class KeepClient(object): _logger.debug("Request fail: PUT %s => %s: %s", url, type(self._result['error']), str(self._result['error'])) return False + _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)", + self._result['status_code'], + len(body), + t.msecs, + (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0) + if self.upload_counter: + self.upload_counter.add(len(body)) return True def _setcurltimeouts(self, curl, timeouts): if not timeouts: return elif isinstance(timeouts, tuple): - conn_t, xfer_t = timeouts + if len(timeouts) == 2: + conn_t, xfer_t = timeouts + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] + else: + conn_t, xfer_t, bandwidth_bps = timeouts else: conn_t, xfer_t = (timeouts, timeouts) + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000)) - curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000)) + curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t))) + curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps))) def _headerfunction(self, header_line): header_line = header_line.decode('iso-8859-1') @@ -585,20 +622,22 @@ class KeepClient(object): :timeout: The initial timeout (in seconds) for HTTP requests to Keep - non-proxy servers. A tuple of two floats is interpreted as - (connection_timeout, read_timeout): see - http://docs.python-requests.org/en/latest/user/advanced/#timeouts. - Because timeouts are often a result of transient server load, the - actual connection timeout will be increased by a factor of two on - each retry. - Default: (2, 300). + non-proxy servers. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). A connection + will be aborted if the average traffic rate falls below + minimum_bandwidth bytes per second over an interval of read_timeout + seconds. Because timeouts are often a result of transient server + load, the actual connection timeout will be increased by a factor + of two on each retry. + Default: (2, 64, 32768). :proxy_timeout: The initial timeout (in seconds) for HTTP requests to - Keep proxies. A tuple of two floats is interpreted as - (connection_timeout, read_timeout). The behavior described - above for adjusting connection timeouts on retry also applies. - Default: (20, 300). + Keep proxies. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). The behavior + described above for adjusting connection timeouts on retry also + applies. + Default: (20, 64, 32768). :api_token: If you're not using an API client, but only talking @@ -638,6 +677,12 @@ class KeepClient(object): self.timeout = timeout self.proxy_timeout = proxy_timeout self._user_agent_pool = Queue.LifoQueue() + self.upload_counter = Counter() + self.download_counter = Counter() + self.put_counter = Counter() + self.get_counter = Counter() + self.hits_counter = Counter() + self.misses_counter = Counter() if local_store: self.local_store = local_store @@ -685,8 +730,10 @@ class KeepClient(object): # TODO(twp): the timeout should be a property of a # KeepService, not a KeepClient. See #4488. t = self.proxy_timeout if self.using_proxy else self.timeout - return (t[0] * (1 << attempt_number), t[1]) - + if len(t) == 2: + return (t[0] * (1 << attempt_number), t[1]) + else: + return (t[0] * (1 << attempt_number), t[1], t[2]) def _any_nondisk_services(self, service_list): return any(ks.get('service_type', 'disk') != 'disk' for ks in service_list) @@ -789,7 +836,10 @@ class KeepClient(object): for root in local_roots: if root not in roots_map: roots_map[root] = self.KeepService( - root, self._user_agent_pool, **headers) + root, self._user_agent_pool, + upload_counter=self.upload_counter, + download_counter=self.download_counter, + **headers) return local_roots @staticmethod @@ -841,12 +891,18 @@ class KeepClient(object): """ if ',' in loc_s: return ''.join(self.get(x) for x in loc_s.split(',')) + + self.get_counter.add(1) + locator = KeepLocator(loc_s) slot, first = self.block_cache.reserve_cache(locator.md5sum) if not first: + self.hits_counter.add(1) v = slot.get() return v + self.misses_counter.add(1) + # If the locator has hints specifying a prefix (indicating a # remote keepproxy) or the UUID of a local gateway service, # read data from the indicated service(s) instead of the usual @@ -861,7 +917,9 @@ class KeepClient(object): )]) # Map root URLs to their KeepService objects. roots_map = { - root: self.KeepService(root, self._user_agent_pool) + root: self.KeepService(root, self._user_agent_pool, + upload_counter=self.upload_counter, + download_counter=self.download_counter) for root in hint_roots } @@ -944,6 +1002,8 @@ class KeepClient(object): elif not isinstance(data, str): raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'") + self.put_counter.add(1) + data_hash = hashlib.md5(data).hexdigest() loc_s = data_hash + '+' + str(len(data)) if copies < 1: @@ -965,7 +1025,8 @@ class KeepClient(object): loop.save_result(error) continue - thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies) + thread_limiter = KeepClient.ThreadLimiter( + copies, self.max_replicas_per_service) threads = [] for service_root, ks in [(root, roots_map[root]) for root in sorted_roots]: