7161: clarify max_replicas_per_service
[arvados.git] / sdk / python / arvados / keep.py
index b26285e8fc6f3f54f26bba340f2ba9b85f977d2a..1b076b61ae41b0200b27986631b13b822823c4c1 100644 (file)
@@ -14,7 +14,7 @@ import re
 import socket
 import ssl
 import string
-import StringIO
+import cStringIO
 import subprocess
 import sys
 import threading
@@ -76,7 +76,7 @@ class KeepLocator(object):
             return getattr(self, data_name)
         def setter(self, hex_str):
             if not arvados.util.is_hex(hex_str, length):
-                raise ValueError("{} must be a {}-digit hex string: {}".
+                raise ValueError("{} is not a {}-digit hex string: {}".
                                  format(name, length, hex_str))
             setattr(self, data_name, hex_str)
         return property(getter, setter)
@@ -340,6 +340,15 @@ class KeepClient(object):
             except:
                 ua.close()
 
+        @staticmethod
+        def _socket_open(family, socktype, protocol, address=None):
+            """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
+            s = socket.socket(family, socktype, protocol)
+            s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
+            return s
+
         def get(self, locator, timeout=None):
             # locator is a KeepLocator object.
             url = self.root + str(locator)
@@ -348,12 +357,13 @@ class KeepClient(object):
             try:
                 with timer.Timer() as t:
                     self._headers = {}
-                    response_body = StringIO.StringIO()
+                    response_body = cStringIO.StringIO()
                     curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
                     curl.setopt(pycurl.URL, url.encode('utf-8'))
                     curl.setopt(pycurl.HTTPHEADER, [
                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
-                    curl.setopt(pycurl.WRITEDATA, response_body)
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                     self._setcurltimeouts(curl, timeout)
                     try:
@@ -377,13 +387,19 @@ class KeepClient(object):
                 }
                 ok = False
             self._usable = ok != False
+            if self._result.get('status_code', None):
+                # The client worked well enough to get an HTTP status
+                # code, so presumably any problems are just on the
+                # server side and it's OK to reuse the client.
+                self._put_user_agent(curl)
+            else:
+                # Don't return this client to the pool, in case it's
+                # broken.
+                curl.close()
             if not ok:
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
-                # Don't return this ua to the pool, in case it's broken.
-                curl.close()
                 return None
-            self._put_user_agent(curl)
             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
@@ -404,14 +420,23 @@ class KeepClient(object):
             curl = self._get_user_agent()
             try:
                 self._headers = {}
-                response_body = StringIO.StringIO()
+                body_reader = cStringIO.StringIO(body)
+                response_body = cStringIO.StringIO()
                 curl.setopt(pycurl.NOSIGNAL, 1)
+                curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
                 curl.setopt(pycurl.URL, url.encode('utf-8'))
-                curl.setopt(pycurl.POSTFIELDS, body)
-                curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
+                # Using UPLOAD tells cURL to wait for a "go ahead" from the
+                # Keep server (in the form of a HTTP/1.1 "100 Continue"
+                # response) instead of sending the request body immediately.
+                # This allows the server to reject the request if the request
+                # is invalid or the server is read-only, without waiting for
+                # the client to send the entire block.
+                curl.setopt(pycurl.UPLOAD, True)
+                curl.setopt(pycurl.INFILESIZE, len(body))
+                curl.setopt(pycurl.READFUNCTION, body_reader.read)
                 curl.setopt(pycurl.HTTPHEADER, [
                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
-                curl.setopt(pycurl.WRITEDATA, response_body)
+                curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
                 self._setcurltimeouts(curl, timeout)
                 try:
@@ -435,13 +460,15 @@ class KeepClient(object):
                 }
                 ok = False
             self._usable = ok != False # still usable if ok is True or None
+            if self._result.get('status_code', None):
+                # Client is functional. See comment in get().
+                self._put_user_agent(curl)
+            else:
+                curl.close()
             if not ok:
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
-                # Don't return this ua to the pool, in case it's broken.
-                curl.close()
                 return False
-            self._put_user_agent(curl)
             return True
 
         def _setcurltimeouts(self, curl, timeouts):
@@ -621,8 +648,10 @@ class KeepClient(object):
                     'uuid': 'proxy',
                     '_service_root': proxy,
                     }]
+                self._writable_services = self._keep_services
                 self.using_proxy = True
                 self._static_services_list = True
+                self.max_replicas_per_service = None
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -632,8 +661,10 @@ class KeepClient(object):
                 self.api_token = api_client.api_token
                 self._gateway_services = {}
                 self._keep_services = None
+                self._writable_services = None
                 self.using_proxy = None
                 self._static_services_list = False
+                self.max_replicas_per_service = None
 
     def current_timeout(self, attempt_number):
         """Return the appropriate timeout to use for this client.
@@ -684,10 +715,19 @@ class KeepClient(object):
             self._keep_services = [
                 ks for ks in accessible
                 if ks.get('service_type') in ['disk', 'proxy']]
+            self._writable_services = [
+                ks for ks in accessible
+                if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
             _logger.debug(str(self._keep_services))
 
             self.using_proxy = any(ks.get('service_type') == 'proxy'
                                    for ks in self._keep_services)
+            # Set max_replicas_per_service to 1 for disk typed services.
+            # In case of, non-disk typed services, we will use None to mean unknown.
+            self.max_replicas_per_service = 1
+            for ks in accessible:
+                if ('disk' != ks.get('service_type')) and not ks.get('read_only'):
+                    self.max_replicas_per_service = None
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -698,7 +738,7 @@ class KeepClient(object):
         """
         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
-    def weighted_service_roots(self, locator, force_rebuild=False):
+    def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
         """Return an array of Keep service endpoints, in the order in
         which they should be probed when reading or writing data with
         the given hash+hints.
@@ -706,15 +746,14 @@ class KeepClient(object):
         self.build_services_list(force_rebuild)
 
         sorted_roots = []
-
         # Use the services indicated by the given +K@... remote
         # service hints, if any are present and can be resolved to a
         # URI.
         for hint in locator.hints:
             if hint.startswith('K@'):
                 if len(hint) == 7:
-                    sorted_roots.append(
-                        "https://keep.{}.arvadosapi.com/".format(hint[2:]))
+                     sorted_roots.append(
+                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
                 elif len(hint) == 29:
                     svc = self._gateway_services.get(hint[2:])
                     if svc:
@@ -723,21 +762,24 @@ class KeepClient(object):
         # Sort the available local services by weight (heaviest first)
         # for this locator, and return their service_roots (base URIs)
         # in that order.
+        use_services = self._keep_services
+        if need_writable:
+          use_services = self._writable_services
         sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
-                self._keep_services,
+                use_services,
                 reverse=True,
                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
         _logger.debug("{}: {}".format(locator, sorted_roots))
         return sorted_roots
 
-    def map_new_services(self, roots_map, locator, force_rebuild, **headers):
+    def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
         # to KeepService objects.  Poll for Keep services, and add any
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.weighted_service_roots(locator, force_rebuild)
+        local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(
@@ -765,7 +807,7 @@ class KeepClient(object):
     def get_from_cache(self, loc):
         """Fetch a block only if is in the cache, otherwise return None."""
         slot = self.block_cache.get(loc)
-        if slot.ready.is_set():
+        if slot is not None and slot.ready.is_set():
             return slot.get()
         else:
             return None
@@ -831,7 +873,8 @@ class KeepClient(object):
             try:
                 sorted_roots = self.map_new_services(
                     roots_map, locator,
-                    force_rebuild=(tries_left < num_retries))
+                    force_rebuild=(tries_left < num_retries),
+                    need_writable=False)
             except Exception as error:
                 loop.save_result(error)
                 continue
@@ -893,26 +936,26 @@ class KeepClient(object):
         if isinstance(data, unicode):
             data = data.encode("ascii")
         elif not isinstance(data, str):
-            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
+            raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
 
         data_hash = hashlib.md5(data).hexdigest()
+        loc_s = data_hash + '+' + str(len(data))
         if copies < 1:
-            return data_hash
-        locator = KeepLocator(data_hash + '+' + str(len(data)))
+            return loc_s
+        locator = KeepLocator(loc_s)
 
         headers = {}
-        if self.using_proxy:
-            # Tell the proxy how many copies we want it to store
-            headers['X-Keep-Desired-Replication'] = str(copies)
+        # Tell the proxy how many copies we want it to store
+        headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(copies)
+        thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
                 local_roots = self.map_new_services(
                     roots_map, locator,
-                    force_rebuild=(tries_left < num_retries), **headers)
+                    force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
                 loop.save_result(error)
                 continue