X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c71a52a28c9cee6511172b3130cf74e8df1b4950..60c9d89a4064bee8f48717a197c40ac5bc28af79:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index f0ccf85d7e..8658774cbb 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -44,6 +44,7 @@ import arvados.errors import arvados.retry as retry import arvados.util import arvados.diskcache +from arvados._pycurlhelper import PyCurlHelper _logger = logging.getLogger('arvados.keep') global_client_object = None @@ -198,14 +199,18 @@ class KeepBlockCache(object): # open it initially and hold the flock(), and a second # hidden one used by mmap(). # - # Set max slots to 3/8 of maximum file handles. This - # means we'll use at most 3/4 of total file handles. + # Set max slots to 1/8 of maximum file handles. This + # means we'll use at most 1/4 of total file handles. # # NOFILE typically defaults to 1024 on Linux so this - # is 384 slots (768 file handles), which means we can - # cache up to 24 GiB of 64 MiB blocks. This leaves - # 256 file handles for sockets and other stuff. - self._max_slots = int((resource.getrlimit(resource.RLIMIT_NOFILE)[0] * 3) / 8) + # is 128 slots (256 file handles), which means we can + # cache up to 8 GiB of 64 MiB blocks. This leaves + # 768 file handles for sockets and other stuff. + # + # When we want the ability to have more cache (e.g. in + # arv-mount) we'll increase rlimit before calling + # this. + self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8) else: # RAM cache slots self._max_slots = 512 @@ -401,18 +406,10 @@ class Counter(object): class KeepClient(object): + DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT + DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT - # Default Keep server connection timeout: 2 seconds - # Default Keep server read timeout: 256 seconds - # Default Keep server bandwidth minimum: 32768 bytes per second - # Default Keep proxy connection timeout: 20 seconds - # Default Keep proxy read timeout: 256 seconds - # Default Keep proxy bandwidth minimum: 32768 bytes per second - DEFAULT_TIMEOUT = (2, 256, 32768) - DEFAULT_PROXY_TIMEOUT = (20, 256, 32768) - - - class KeepService(object): + class KeepService(PyCurlHelper): """Make requests to a single Keep service, and track results. A KeepService is intended to last long enough to perform one @@ -435,6 +432,7 @@ class KeepClient(object): download_counter=None, headers={}, insecure=False): + super(KeepClient.KeepService, self).__init__() self.root = root self._user_agent_pool = user_agent_pool self._result = {'error': None} @@ -472,30 +470,6 @@ class KeepClient(object): except: ua.close() - def _socket_open(self, *args, **kwargs): - if len(args) + len(kwargs) == 2: - return self._socket_open_pycurl_7_21_5(*args, **kwargs) - else: - return self._socket_open_pycurl_7_19_3(*args, **kwargs) - - def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None): - return self._socket_open_pycurl_7_21_5( - purpose=None, - address=collections.namedtuple( - 'Address', ['family', 'socktype', 'protocol', 'addr'], - )(family, socktype, protocol, address)) - - def _socket_open_pycurl_7_21_5(self, purpose, address): - """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE""" - s = socket.socket(address.family, address.socktype, address.protocol) - s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - # Will throw invalid protocol error on mac. This test prevents that. - if hasattr(socket, 'TCP_KEEPIDLE'): - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75) - self._socket = s - return s - def get(self, locator, method="GET", timeout=None): # locator is a KeepLocator object. url = self.root + str(locator) @@ -521,6 +495,8 @@ class KeepClient(object): curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path()) if method == "HEAD": curl.setopt(pycurl.NOBODY, True) + else: + curl.setopt(pycurl.HTTPGET, True) self._setcurltimeouts(curl, timeout, method=="HEAD") try: @@ -665,43 +641,6 @@ class KeepClient(object): self.upload_counter.add(len(body)) return True - def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False): - if not timeouts: - return - elif isinstance(timeouts, tuple): - if len(timeouts) == 2: - conn_t, xfer_t = timeouts - bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] - else: - conn_t, xfer_t, bandwidth_bps = timeouts - else: - conn_t, xfer_t = (timeouts, timeouts) - bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2] - curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000)) - if not ignore_bandwidth: - curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t))) - curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps))) - - def _headerfunction(self, header_line): - if isinstance(header_line, bytes): - header_line = header_line.decode('iso-8859-1') - if ':' in header_line: - name, value = header_line.split(':', 1) - name = name.strip().lower() - value = value.strip() - elif self._headers: - name = self._lastheadername - value = self._headers[name] + ' ' + header_line.strip() - elif header_line.startswith('HTTP/'): - name = 'x-status-line' - value = header_line - else: - _logger.error("Unexpected header line: %s", header_line) - return - self._lastheadername = name - self._headers[name] = value - # Returning None implies all bytes were written - class KeepWriterQueue(queue.Queue): def __init__(self, copies, classes=[]): @@ -1235,21 +1174,37 @@ class KeepClient(object): try: locator = KeepLocator(loc_s) if method == "GET": - slot, first = self.block_cache.reserve_cache(locator.md5sum) - if not first: + while slot is None: + slot, first = self.block_cache.reserve_cache(locator.md5sum) + if first: + # Fresh and empty "first time it is used" slot + break if prefetch: - # this is request for a prefetch, if it is - # already in flight, return immediately. - # clear 'slot' to prevent finally block from - # calling slot.set() + # this is request for a prefetch to fill in + # the cache, don't need to wait for the + # result, so if it is already in flight return + # immediately. Clear 'slot' to prevent + # finally block from calling slot.set() slot = None return None - self.hits_counter.add(1) + blob = slot.get() - if blob is None: - raise arvados.errors.KeepReadError( - "failed to read {}".format(loc_s)) - return blob + if blob is not None: + self.hits_counter.add(1) + return blob + + # If blob is None, this means either + # + # (a) another thread was fetching this block and + # failed with an error or + # + # (b) cache thrashing caused the slot to be + # evicted (content set to None) by another thread + # between the call to reserve_cache() and get(). + # + # We'll handle these cases by reserving a new slot + # and then doing a full GET request. + slot = None self.misses_counter.add(1)