X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7ab494e42b35b3769a326a16b7e90f1d20147ced..a830b5b560251c3143a7b1fd60db3f50a7021b34:/sdk/python/arvados/keep.py?ds=sidebyside diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index b26285e8fc..b2700ae5ba 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -14,7 +14,7 @@ import re import socket import ssl import string -import StringIO +import cStringIO import subprocess import sys import threading @@ -76,7 +76,7 @@ class KeepLocator(object): return getattr(self, data_name) def setter(self, hex_str): if not arvados.util.is_hex(hex_str, length): - raise ValueError("{} must be a {}-digit hex string: {}". + raise ValueError("{} is not a {}-digit hex string: {}". format(name, length, hex_str)) setattr(self, data_name, hex_str) return property(getter, setter) @@ -340,6 +340,15 @@ class KeepClient(object): except: ua.close() + @staticmethod + def _socket_open(family, socktype, protocol, address=None): + """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE""" + s = socket.socket(family, socktype, protocol) + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75) + return s + def get(self, locator, timeout=None): # locator is a KeepLocator object. url = self.root + str(locator) @@ -348,12 +357,13 @@ class KeepClient(object): try: with timer.Timer() as t: self._headers = {} - response_body = StringIO.StringIO() + response_body = cStringIO.StringIO() curl.setopt(pycurl.NOSIGNAL, 1) + curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open) curl.setopt(pycurl.URL, url.encode('utf-8')) curl.setopt(pycurl.HTTPHEADER, [ '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()]) - curl.setopt(pycurl.WRITEDATA, response_body) + curl.setopt(pycurl.WRITEFUNCTION, response_body.write) curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) self._setcurltimeouts(curl, timeout) try: @@ -377,13 +387,19 @@ class KeepClient(object): } ok = False self._usable = ok != False + if self._result.get('status_code', None): + # The client worked well enough to get an HTTP status + # code, so presumably any problems are just on the + # server side and it's OK to reuse the client. + self._put_user_agent(curl) + else: + # Don't return this client to the pool, in case it's + # broken. + curl.close() if not ok: _logger.debug("Request fail: GET %s => %s: %s", url, type(self._result['error']), str(self._result['error'])) - # Don't return this ua to the pool, in case it's broken. - curl.close() return None - self._put_user_agent(curl) _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)", self._result['status_code'], len(self._result['body']), @@ -404,14 +420,23 @@ class KeepClient(object): curl = self._get_user_agent() try: self._headers = {} - response_body = StringIO.StringIO() + body_reader = cStringIO.StringIO(body) + response_body = cStringIO.StringIO() curl.setopt(pycurl.NOSIGNAL, 1) + curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open) curl.setopt(pycurl.URL, url.encode('utf-8')) - curl.setopt(pycurl.POSTFIELDS, body) - curl.setopt(pycurl.CUSTOMREQUEST, 'PUT') + # Using UPLOAD tells cURL to wait for a "go ahead" from the + # Keep server (in the form of a HTTP/1.1 "100 Continue" + # response) instead of sending the request body immediately. + # This allows the server to reject the request if the request + # is invalid or the server is read-only, without waiting for + # the client to send the entire block. + curl.setopt(pycurl.UPLOAD, True) + curl.setopt(pycurl.INFILESIZE, len(body)) + curl.setopt(pycurl.READFUNCTION, body_reader.read) curl.setopt(pycurl.HTTPHEADER, [ '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()]) - curl.setopt(pycurl.WRITEDATA, response_body) + curl.setopt(pycurl.WRITEFUNCTION, response_body.write) curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction) self._setcurltimeouts(curl, timeout) try: @@ -435,13 +460,15 @@ class KeepClient(object): } ok = False self._usable = ok != False # still usable if ok is True or None + if self._result.get('status_code', None): + # Client is functional. See comment in get(). + self._put_user_agent(curl) + else: + curl.close() if not ok: _logger.debug("Request fail: PUT %s => %s: %s", url, type(self._result['error']), str(self._result['error'])) - # Don't return this ua to the pool, in case it's broken. - curl.close() return False - self._put_user_agent(curl) return True def _setcurltimeouts(self, curl, timeouts): @@ -621,6 +648,7 @@ class KeepClient(object): 'uuid': 'proxy', '_service_root': proxy, }] + self._writable_services = self._keep_services self.using_proxy = True self._static_services_list = True else: @@ -632,6 +660,7 @@ class KeepClient(object): self.api_token = api_client.api_token self._gateway_services = {} self._keep_services = None + self._writable_services = None self.using_proxy = None self._static_services_list = False @@ -684,6 +713,9 @@ class KeepClient(object): self._keep_services = [ ks for ks in accessible if ks.get('service_type') in ['disk', 'proxy']] + self._writable_services = [ + ks for ks in accessible + if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))] _logger.debug(str(self._keep_services)) self.using_proxy = any(ks.get('service_type') == 'proxy' @@ -698,7 +730,7 @@ class KeepClient(object): """ return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest() - def weighted_service_roots(self, locator, force_rebuild=False): + def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False): """Return an array of Keep service endpoints, in the order in which they should be probed when reading or writing data with the given hash+hints. @@ -723,21 +755,24 @@ class KeepClient(object): # Sort the available local services by weight (heaviest first) # for this locator, and return their service_roots (base URIs) # in that order. + use_services = self._keep_services + if need_writable: + use_services = self._writable_services sorted_roots.extend([ svc['_service_root'] for svc in sorted( - self._keep_services, + use_services, reverse=True, key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))]) _logger.debug("{}: {}".format(locator, sorted_roots)) return sorted_roots - def map_new_services(self, roots_map, locator, force_rebuild, **headers): + def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers): # roots_map is a dictionary, mapping Keep service root strings # to KeepService objects. Poll for Keep services, and add any # new ones to roots_map. Return the current list of local # root strings. headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,)) - local_roots = self.weighted_service_roots(locator, force_rebuild) + local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable) for root in local_roots: if root not in roots_map: roots_map[root] = self.KeepService( @@ -765,7 +800,7 @@ class KeepClient(object): def get_from_cache(self, loc): """Fetch a block only if is in the cache, otherwise return None.""" slot = self.block_cache.get(loc) - if slot.ready.is_set(): + if slot is not None and slot.ready.is_set(): return slot.get() else: return None @@ -831,7 +866,8 @@ class KeepClient(object): try: sorted_roots = self.map_new_services( roots_map, locator, - force_rebuild=(tries_left < num_retries)) + force_rebuild=(tries_left < num_retries), + need_writable=False) except Exception as error: loop.save_result(error) continue @@ -893,7 +929,7 @@ class KeepClient(object): if isinstance(data, unicode): data = data.encode("ascii") elif not isinstance(data, str): - raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'") + raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'") data_hash = hashlib.md5(data).hexdigest() if copies < 1: @@ -912,7 +948,7 @@ class KeepClient(object): try: local_roots = self.map_new_services( roots_map, locator, - force_rebuild=(tries_left < num_retries), **headers) + force_rebuild=(tries_left < num_retries), need_writable=True, **headers) except Exception as error: loop.save_result(error) continue