X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/da453701654115387e6b11189c6a830f24abf715..e9c78ef7855e7ae263fe461e069c89ff7fc0b798:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index c55b816156..e01fec412b 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -1,28 +1,16 @@ -import bz2 +import cStringIO import datetime -import fcntl -import functools -import gflags import hashlib -import json import logging +import math import os -import pprint import pycurl import Queue import re import socket import ssl -import string -import cStringIO -import subprocess -import sys import threading -import time import timer -import types -import UserDict -import zlib import arvados import arvados.config as config @@ -225,28 +213,36 @@ class KeepBlockCache(object): class KeepClient(object): # Default Keep server connection timeout: 2 seconds - # Default Keep server read timeout: 300 seconds + # Default Keep server read timeout: 64 seconds + # Default Keep server bandwidth minimum: 32768 bytes per second # Default Keep proxy connection timeout: 20 seconds - # Default Keep proxy read timeout: 300 seconds - DEFAULT_TIMEOUT = (2, 300) - DEFAULT_PROXY_TIMEOUT = (20, 300) + # Default Keep proxy read timeout: 64 seconds + # Default Keep proxy bandwidth minimum: 32768 bytes per second + DEFAULT_TIMEOUT = (2, 64, 32768) + DEFAULT_PROXY_TIMEOUT = (20, 64, 32768) class ThreadLimiter(object): - """ - Limit the number of threads running at a given time to - {desired successes} minus {successes reported}. When successes - reported == desired, wake up the remaining threads and tell - them to quit. + """Limit the number of threads writing to Keep at once. + + This ensures that only a number of writer threads that could + potentially achieve the desired replication level run at once. + Once the desired replication level is achieved, queued threads + are instructed not to run. Should be used in a "with" block. """ - def __init__(self, todo): + def __init__(self, want_copies, max_service_replicas): self._started = 0 - self._todo = todo + self._want_copies = want_copies self._done = 0 self._response = None self._start_lock = threading.Condition() - self._todo_lock = threading.Semaphore(todo) + if (not max_service_replicas) or (max_service_replicas >= want_copies): + max_threads = 1 + else: + max_threads = math.ceil(float(want_copies) / max_service_replicas) + _logger.debug("Limiter max threads is %d", max_threads) + self._todo_lock = threading.Semaphore(max_threads) self._done_lock = threading.Lock() self._local = threading.local() @@ -257,9 +253,9 @@ class KeepClient(object): # we wait here until N other threads have started. while self._started < self._local.sequence: self._start_lock.wait() + self._todo_lock.acquire() self._started += 1 self._start_lock.notifyAll() - self._todo_lock.acquire() self._start_lock.release() return self @@ -271,34 +267,28 @@ class KeepClient(object): def shall_i_proceed(self): """ - Return true if the current thread should do stuff. Return - false if the current thread should just stop. + Return true if the current thread should write to Keep. + Return false otherwise. """ with self._done_lock: - return (self._done < self._todo) + return (self._done < self._want_copies) def save_response(self, response_body, replicas_stored): """ Records a response body (a locator, possibly signed) returned by - the Keep server. It is not necessary to save more than - one response, since we presume that any locator returned - in response to a successful request is valid. + the Keep server, and the number of replicas it stored. """ with self._done_lock: self._done += replicas_stored self._response = response_body def response(self): - """ - Returns the body from the response to a PUT request. - """ + """Return the body from the response to a PUT request.""" with self._done_lock: return self._response def done(self): - """ - Return how many successes were reported. - """ + """Return the total number of replicas successfully stored.""" with self._done_lock: return self._done @@ -490,11 +480,17 @@ class KeepClient(object): if not timeouts: return elif isinstance(timeouts, tuple): - conn_t, xfer_t = timeouts + if len(timeouts) == 2: + conn_t, xfer_t = timeouts + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] + else: + conn_t, xfer_t, bandwidth_bps = timeouts else: conn_t, xfer_t = (timeouts, timeouts) + bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000)) - curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000)) + curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t))) + curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps))) def _headerfunction(self, header_line): header_line = header_line.decode('iso-8859-1') @@ -598,20 +594,22 @@ class KeepClient(object): :timeout: The initial timeout (in seconds) for HTTP requests to Keep - non-proxy servers. A tuple of two floats is interpreted as - (connection_timeout, read_timeout): see - http://docs.python-requests.org/en/latest/user/advanced/#timeouts. - Because timeouts are often a result of transient server load, the - actual connection timeout will be increased by a factor of two on - each retry. - Default: (2, 300). + non-proxy servers. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). A connection + will be aborted if the average traffic rate falls below + minimum_bandwidth bytes per second over an interval of read_timeout + seconds. Because timeouts are often a result of transient server + load, the actual connection timeout will be increased by a factor + of two on each retry. + Default: (2, 64, 32768). :proxy_timeout: The initial timeout (in seconds) for HTTP requests to - Keep proxies. A tuple of two floats is interpreted as - (connection_timeout, read_timeout). The behavior described - above for adjusting connection timeouts on retry also applies. - Default: (20, 300). + Keep proxies. A tuple of three floats is interpreted as + (connection_timeout, read_timeout, minimum_bandwidth). The behavior + described above for adjusting connection timeouts on retry also + applies. + Default: (20, 64, 32768). :api_token: If you're not using an API client, but only talking @@ -658,6 +656,7 @@ class KeepClient(object): self.put = self.local_store_put else: self.num_retries = num_retries + self.max_replicas_per_service = None if proxy: if not proxy.endswith('/'): proxy += '/' @@ -665,12 +664,12 @@ class KeepClient(object): self._gateway_services = {} self._keep_services = [{ 'uuid': 'proxy', + 'service_type': 'proxy', '_service_root': proxy, }] self._writable_services = self._keep_services self.using_proxy = True self._static_services_list = True - self.max_replicas_per_service = 1 else: # It's important to avoid instantiating an API client # unless we actually need one, for testing's sake. @@ -683,7 +682,6 @@ class KeepClient(object): self._writable_services = None self.using_proxy = None self._static_services_list = False - self.max_replicas_per_service = 1 def current_timeout(self, attempt_number): """Return the appropriate timeout to use for this client. @@ -698,7 +696,13 @@ class KeepClient(object): # TODO(twp): the timeout should be a property of a # KeepService, not a KeepClient. See #4488. t = self.proxy_timeout if self.using_proxy else self.timeout - return (t[0] * (1 << attempt_number), t[1]) + if len(t) == 2: + return (t[0] * (1 << attempt_number), t[1]) + else: + return (t[0] * (1 << attempt_number), t[1], t[2]) + def _any_nondisk_services(self, service_list): + return any(ks.get('service_type', 'disk') != 'disk' + for ks in service_list) def build_services_list(self, force_rebuild=False): if (self._static_services_list or @@ -710,12 +714,16 @@ class KeepClient(object): except Exception: # API server predates Keep services. keep_services = self.api_client.keep_disks().list() - accessible = keep_services.execute().get('items') - if not accessible: + # Gateway services are only used when specified by UUID, + # so there's nothing to gain by filtering them by + # service_type. + self._gateway_services = {ks['uuid']: ks for ks in + keep_services.execute()['items']} + if not self._gateway_services: raise arvados.errors.NoKeepServersError() # Precompute the base URI for each service. - for r in accessible: + for r in self._gateway_services.itervalues(): host = r['service_host'] if not host.startswith('[') and host.find(':') >= 0: # IPv6 URIs must be formatted like http://[::1]:80/... @@ -725,27 +733,19 @@ class KeepClient(object): host, r['service_port']) - # Gateway services are only used when specified by UUID, - # so there's nothing to gain by filtering them by - # service_type. - self._gateway_services = {ks.get('uuid'): ks for ks in accessible} _logger.debug(str(self._gateway_services)) - self._keep_services = [ - ks for ks in accessible - if ks.get('service_type') in ['disk', 'proxy']] - self._writable_services = [ - ks for ks in accessible - if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))] - _logger.debug(str(self._keep_services)) - - self.using_proxy = any(ks.get('service_type') == 'proxy' - for ks in self._keep_services) + ks for ks in self._gateway_services.itervalues() + if not ks.get('service_type', '').startswith('gateway:')] + self._writable_services = [ks for ks in self._keep_services + if not ks.get('read_only')] + # For disk type services, max_replicas_per_service is 1 - # It is unknown or unlimited for non-disk typed services. - for ks in accessible: - if ('disk' != ks.get('service_type')) and (not ks.get('read_only')): - self.max_replicas_per_service = None + # It is unknown (unlimited) for other service types. + if self._any_nondisk_services(self._writable_services): + self.max_replicas_per_service = None + else: + self.max_replicas_per_service = 1 def _service_weight(self, data_hash, service_uuid): """Compute the weight of a Keep service endpoint for a data @@ -782,7 +782,8 @@ class KeepClient(object): # in that order. use_services = self._keep_services if need_writable: - use_services = self._writable_services + use_services = self._writable_services + self.using_proxy = self._any_nondisk_services(use_services) sorted_roots.extend([ svc['_service_root'] for svc in sorted( use_services, @@ -966,10 +967,8 @@ class KeepClient(object): # Tell the proxy how many copies we want it to store headers['X-Keep-Desired-Replication'] = str(copies) roots_map = {} - thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies) loop = retry.RetryLoop(num_retries, self._check_loop_result, backoff_start=2) - thread_sequence = 0 for tries_left in loop: try: sorted_roots = self.map_new_services( @@ -979,6 +978,8 @@ class KeepClient(object): loop.save_result(error) continue + thread_limiter = KeepClient.ThreadLimiter( + copies, self.max_replicas_per_service) threads = [] for service_root, ks in [(root, roots_map[root]) for root in sorted_roots]: @@ -991,10 +992,9 @@ class KeepClient(object): service_root=service_root, thread_limiter=thread_limiter, timeout=self.current_timeout(num_retries-tries_left), - thread_sequence=thread_sequence) + thread_sequence=len(threads)) t.start() threads.append(t) - thread_sequence += 1 for t in threads: t.join() loop.save_result((thread_limiter.done() >= copies, len(threads)))