import socket
import ssl
import string
-import StringIO
+import cStringIO
import subprocess
import sys
import threading
return getattr(self, data_name)
def setter(self, hex_str):
if not arvados.util.is_hex(hex_str, length):
- raise ValueError("{} must be a {}-digit hex string: {}".
+ raise ValueError("{} is not a {}-digit hex string: {}".
format(name, length, hex_str))
setattr(self, data_name, hex_str)
return property(getter, setter)
except:
ua.close()
+ @staticmethod
+ def _socket_open(family, socktype, protocol, address=None):
+ """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+ s = socket.socket(family, socktype, protocol)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+ return s
+
def get(self, locator, timeout=None):
# locator is a KeepLocator object.
url = self.root + str(locator)
try:
with timer.Timer() as t:
self._headers = {}
- response_body = StringIO.StringIO()
+ response_body = cStringIO.StringIO()
curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
curl.setopt(pycurl.URL, url.encode('utf-8'))
curl.setopt(pycurl.HTTPHEADER, [
'{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
- curl.setopt(pycurl.WRITEDATA, response_body)
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
self._setcurltimeouts(curl, timeout)
try:
}
ok = False
self._usable = ok != False
+ if self._result.get('status_code', None):
+ # The client worked well enough to get an HTTP status
+ # code, so presumably any problems are just on the
+ # server side and it's OK to reuse the client.
+ self._put_user_agent(curl)
+ else:
+ # Don't return this client to the pool, in case it's
+ # broken.
+ curl.close()
if not ok:
_logger.debug("Request fail: GET %s => %s: %s",
url, type(self._result['error']), str(self._result['error']))
- # Don't return this ua to the pool, in case it's broken.
- curl.close()
return None
- self._put_user_agent(curl)
_logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
self._result['status_code'],
len(self._result['body']),
curl = self._get_user_agent()
try:
self._headers = {}
- response_body = StringIO.StringIO()
+ body_reader = cStringIO.StringIO(body)
+ response_body = cStringIO.StringIO()
curl.setopt(pycurl.NOSIGNAL, 1)
+ curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
curl.setopt(pycurl.URL, url.encode('utf-8'))
- curl.setopt(pycurl.POSTFIELDS, body)
- curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
+ # Using UPLOAD tells cURL to wait for a "go ahead" from the
+ # Keep server (in the form of a HTTP/1.1 "100 Continue"
+ # response) instead of sending the request body immediately.
+ # This allows the server to reject the request if the request
+ # is invalid or the server is read-only, without waiting for
+ # the client to send the entire block.
+ curl.setopt(pycurl.UPLOAD, True)
+ curl.setopt(pycurl.INFILESIZE, len(body))
+ curl.setopt(pycurl.READFUNCTION, body_reader.read)
curl.setopt(pycurl.HTTPHEADER, [
'{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
- curl.setopt(pycurl.WRITEDATA, response_body)
+ curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
self._setcurltimeouts(curl, timeout)
try:
}
ok = False
self._usable = ok != False # still usable if ok is True or None
+ if self._result.get('status_code', None):
+ # Client is functional. See comment in get().
+ self._put_user_agent(curl)
+ else:
+ curl.close()
if not ok:
_logger.debug("Request fail: PUT %s => %s: %s",
url, type(self._result['error']), str(self._result['error']))
- # Don't return this ua to the pool, in case it's broken.
- curl.close()
return False
- self._put_user_agent(curl)
return True
def _setcurltimeouts(self, curl, timeouts):
'uuid': 'proxy',
'_service_root': proxy,
}]
+ self._writable_services = self._keep_services
self.using_proxy = True
self._static_services_list = True
+ self.max_replicas_per_service = None
else:
# It's important to avoid instantiating an API client
# unless we actually need one, for testing's sake.
self.api_token = api_client.api_token
self._gateway_services = {}
self._keep_services = None
+ self._writable_services = None
self.using_proxy = None
self._static_services_list = False
+ self.max_replicas_per_service = None
def current_timeout(self, attempt_number):
"""Return the appropriate timeout to use for this client.
self._keep_services = [
ks for ks in accessible
if ks.get('service_type') in ['disk', 'proxy']]
+ self._writable_services = [
+ ks for ks in accessible
+ if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
_logger.debug(str(self._keep_services))
self.using_proxy = any(ks.get('service_type') == 'proxy'
for ks in self._keep_services)
+ # Set max_replicas_per_service to 1 for disk typed services.
+ # In case of, non-disk typed services, we will use None to mean unknown.
+ self.max_replicas_per_service = 1
+ for ks in accessible:
+ if ('disk' != ks.get('service_type')) and not ks.get('read_only'):
+ self.max_replicas_per_service = None
def _service_weight(self, data_hash, service_uuid):
"""Compute the weight of a Keep service endpoint for a data
"""
return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
- def weighted_service_roots(self, locator, force_rebuild=False):
+ def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
"""Return an array of Keep service endpoints, in the order in
which they should be probed when reading or writing data with
the given hash+hints.
self.build_services_list(force_rebuild)
sorted_roots = []
-
# Use the services indicated by the given +K@... remote
# service hints, if any are present and can be resolved to a
# URI.
for hint in locator.hints:
if hint.startswith('K@'):
if len(hint) == 7:
- sorted_roots.append(
- "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+ sorted_roots.append(
+ "https://keep.{}.arvadosapi.com/".format(hint[2:]))
elif len(hint) == 29:
svc = self._gateway_services.get(hint[2:])
if svc:
# Sort the available local services by weight (heaviest first)
# for this locator, and return their service_roots (base URIs)
# in that order.
+ use_services = self._keep_services
+ if need_writable:
+ use_services = self._writable_services
sorted_roots.extend([
svc['_service_root'] for svc in sorted(
- self._keep_services,
+ use_services,
reverse=True,
key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
_logger.debug("{}: {}".format(locator, sorted_roots))
return sorted_roots
- def map_new_services(self, roots_map, locator, force_rebuild, **headers):
+ def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
# roots_map is a dictionary, mapping Keep service root strings
# to KeepService objects. Poll for Keep services, and add any
# new ones to roots_map. Return the current list of local
# root strings.
headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
- local_roots = self.weighted_service_roots(locator, force_rebuild)
+ local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
for root in local_roots:
if root not in roots_map:
roots_map[root] = self.KeepService(
def get_from_cache(self, loc):
"""Fetch a block only if is in the cache, otherwise return None."""
slot = self.block_cache.get(loc)
- if slot.ready.is_set():
+ if slot is not None and slot.ready.is_set():
return slot.get()
else:
return None
try:
sorted_roots = self.map_new_services(
roots_map, locator,
- force_rebuild=(tries_left < num_retries))
+ force_rebuild=(tries_left < num_retries),
+ need_writable=False)
except Exception as error:
loop.save_result(error)
continue
if isinstance(data, unicode):
data = data.encode("ascii")
elif not isinstance(data, str):
- raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+ raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
data_hash = hashlib.md5(data).hexdigest()
+ loc_s = data_hash + '+' + str(len(data))
if copies < 1:
- return data_hash
- locator = KeepLocator(data_hash + '+' + str(len(data)))
+ return loc_s
+ locator = KeepLocator(loc_s)
headers = {}
- if self.using_proxy:
- # Tell the proxy how many copies we want it to store
- headers['X-Keep-Desired-Replication'] = str(copies)
+ # Tell the proxy how many copies we want it to store
+ headers['X-Keep-Desired-Replication'] = str(copies)
roots_map = {}
- thread_limiter = KeepClient.ThreadLimiter(copies)
+ thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
for tries_left in loop:
try:
local_roots = self.map_new_services(
roots_map, locator,
- force_rebuild=(tries_left < num_retries), **headers)
+ force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
except Exception as error:
loop.save_result(error)
continue