import arvados.retry as retry
import arvados.util
import arvados.diskcache
+from arvados._pycurlhelper import PyCurlHelper
_logger = logging.getLogger('arvados.keep')
global_client_object = None
# open it initially and hold the flock(), and a second
# hidden one used by mmap().
#
- # Set max slots to 3/8 of maximum file handles. This
- # means we'll use at most 3/4 of total file handles.
+ # Set max slots to 1/8 of maximum file handles. This
+ # means we'll use at most 1/4 of total file handles.
#
# NOFILE typically defaults to 1024 on Linux so this
- # is 384 slots (768 file handles), which means we can
- # cache up to 24 GiB of 64 MiB blocks. This leaves
- # 256 file handles for sockets and other stuff.
- self._max_slots = int((resource.getrlimit(resource.RLIMIT_NOFILE)[0] * 3) / 8)
+ # is 128 slots (256 file handles), which means we can
+ # cache up to 8 GiB of 64 MiB blocks. This leaves
+ # 768 file handles for sockets and other stuff.
+ #
+ # When we want the ability to have more cache (e.g. in
+ # arv-mount) we'll increase rlimit before calling
+ # this.
+ self._max_slots = int(resource.getrlimit(resource.RLIMIT_NOFILE)[0] / 8)
else:
# RAM cache slots
self._max_slots = 512
class KeepClient(object):
+ DEFAULT_TIMEOUT = PyCurlHelper.DEFAULT_TIMEOUT
+ DEFAULT_PROXY_TIMEOUT = PyCurlHelper.DEFAULT_PROXY_TIMEOUT
- # Default Keep server connection timeout: 2 seconds
- # Default Keep server read timeout: 256 seconds
- # Default Keep server bandwidth minimum: 32768 bytes per second
- # Default Keep proxy connection timeout: 20 seconds
- # Default Keep proxy read timeout: 256 seconds
- # Default Keep proxy bandwidth minimum: 32768 bytes per second
- DEFAULT_TIMEOUT = (2, 256, 32768)
- DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
-
-
- class KeepService(object):
+ class KeepService(PyCurlHelper):
"""Make requests to a single Keep service, and track results.
A KeepService is intended to last long enough to perform one
download_counter=None,
headers={},
insecure=False):
+ super(KeepClient.KeepService, self).__init__()
self.root = root
self._user_agent_pool = user_agent_pool
self._result = {'error': None}
except:
ua.close()
- def _socket_open(self, *args, **kwargs):
- if len(args) + len(kwargs) == 2:
- return self._socket_open_pycurl_7_21_5(*args, **kwargs)
- else:
- return self._socket_open_pycurl_7_19_3(*args, **kwargs)
-
- def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
- return self._socket_open_pycurl_7_21_5(
- purpose=None,
- address=collections.namedtuple(
- 'Address', ['family', 'socktype', 'protocol', 'addr'],
- )(family, socktype, protocol, address))
-
- def _socket_open_pycurl_7_21_5(self, purpose, address):
- """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
- s = socket.socket(address.family, address.socktype, address.protocol)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- # Will throw invalid protocol error on mac. This test prevents that.
- if hasattr(socket, 'TCP_KEEPIDLE'):
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
- self._socket = s
- return s
-
def get(self, locator, method="GET", timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
+ else:
+ curl.setopt(pycurl.HTTPGET, True)
self._setcurltimeouts(curl, timeout, method=="HEAD")
try:
self.upload_counter.add(len(body))
return True
- def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
- if not timeouts:
- return
- elif isinstance(timeouts, tuple):
- if len(timeouts) == 2:
- conn_t, xfer_t = timeouts
- bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
- else:
- conn_t, xfer_t, bandwidth_bps = timeouts
- else:
- conn_t, xfer_t = (timeouts, timeouts)
- bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
- curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- if not ignore_bandwidth:
- curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
- curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
-
- def _headerfunction(self, header_line):
- if isinstance(header_line, bytes):
- header_line = header_line.decode('iso-8859-1')
- if ':' in header_line:
- name, value = header_line.split(':', 1)
- name = name.strip().lower()
- value = value.strip()
- elif self._headers:
- name = self._lastheadername
- value = self._headers[name] + ' ' + header_line.strip()
- elif header_line.startswith('HTTP/'):
- name = 'x-status-line'
- value = header_line
- else:
- _logger.error("Unexpected header line: %s", header_line)
- return
- self._lastheadername = name
- self._headers[name] = value
- # Returning None implies all bytes were written
-
class KeepWriterQueue(queue.Queue):
def __init__(self, copies, classes=[]):