Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / keep.py
index 1fd8fa5d2c640b1291cd99c643bf8d90ac5bd287..cc7dbd4161d1586e0c530c2d90a97c5c099670d0 100644 (file)
@@ -21,15 +21,15 @@ import ssl
 import socket
 import requests
 
-_logger = logging.getLogger('arvados.keep')
-global_client_object = None
-
 import arvados
 import arvados.config as config
 import arvados.errors
 import arvados.retry as retry
 import arvados.util
 
+_logger = logging.getLogger('arvados.keep')
+global_client_object = None
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -58,6 +58,9 @@ class KeepLocator(object):
                              self.permission_hint()] + self.hints
             if s is not None)
 
+    def stripped(self):
+        return "%s+%i" % (self.md5sum, self.size)
+
     def _make_hex_prop(name, length):
         # Build and return a new property with the given name that
         # must be a hex string of the given length.
@@ -171,8 +174,7 @@ class KeepBlockCache(object):
 
     def cap_cache(self):
         '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
+        with self._cache_lock:
             # Select all slots except those where ready.is_set() and content is
             # None (that means there was an error reading the block).
             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
@@ -183,30 +185,35 @@ class KeepBlockCache(object):
                         del self._cache[i]
                         break
                 sm = sum([slot.size() for slot in self._cache])
-        finally:
-            self._cache_lock.release()
+
+    def _get(self, locator):
+        # Test if the locator is already in the cache
+        for i in xrange(0, len(self._cache)):
+            if self._cache[i].locator == locator:
+                n = self._cache[i]
+                if i != 0:
+                    # move it to the front
+                    del self._cache[i]
+                    self._cache.insert(0, n)
+                return n
+        return None
+
+    def get(self, locator):
+        with self._cache_lock:
+            return self._get(locator)
 
     def reserve_cache(self, locator):
         '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
-
-            # Add a new cache slot for the locator
-            n = KeepBlockCache.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
-        finally:
-            self._cache_lock.release()
+        with self._cache_lock:
+            n = self._get(locator)
+            if n:
+                return n, False
+            else:
+                # Add a new cache slot for the locator
+                n = KeepBlockCache.CacheSlot(locator)
+                self._cache.insert(0, n)
+                return n, True
 
 class KeepClient(object):
 
@@ -279,10 +286,11 @@ class KeepClient(object):
         HTTP_ERRORS = (requests.exceptions.RequestException,
                        socket.error, ssl.SSLError)
 
-        def __init__(self, root, **headers):
+        def __init__(self, root, session, **headers):
             self.root = root
             self.last_result = None
             self.success_flag = None
+            self.session = session
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
@@ -305,7 +313,7 @@ class KeepClient(object):
             _logger.debug("Request: GET %s", url)
             try:
                 with timer.Timer() as t:
-                    result = requests.get(url.encode('utf-8'),
+                    result = self.session.get(url.encode('utf-8'),
                                           headers=self.get_headers,
                                           timeout=timeout)
             except self.HTTP_ERRORS as e:
@@ -318,7 +326,7 @@ class KeepClient(object):
                 content = result.content
                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
                              self.last_status(), len(content), t.msecs,
-                             (len(content)/(1024.0*1024))/t.secs)
+                             (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
                 if self.success_flag:
                     resp_md5 = hashlib.md5(content).hexdigest()
                     if resp_md5 == locator.md5sum:
@@ -331,7 +339,7 @@ class KeepClient(object):
             url = self.root + hash_s
             _logger.debug("Request: PUT %s", url)
             try:
-                result = requests.put(url.encode('utf-8'),
+                result = self.session.put(url.encode('utf-8'),
                                       data=body,
                                       headers=self.put_headers,
                                       timeout=timeout)
@@ -371,9 +379,10 @@ class KeepClient(object):
         def run_with_limiter(self, limiter):
             if self.service.finished():
                 return
-            _logger.debug("KeepWriterThread %s proceeding %s %s",
+            _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
                           str(threading.current_thread()),
                           self.args['data_hash'],
+                          len(self.args['data']),
                           self.args['service_root'])
             self._success = bool(self.service.put(
                 self.args['data_hash'],
@@ -382,9 +391,10 @@ class KeepClient(object):
             status = self.service.last_status()
             if self._success:
                 result = self.service.last_result
-                _logger.debug("KeepWriterThread %s succeeded %s %s",
+                _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                               str(threading.current_thread()),
                               self.args['data_hash'],
+                              len(self.args['data']),
                               self.args['service_root'])
                 # Tick the 'done' counter for the number of replica
                 # reported stored by the server, for the case that
@@ -404,7 +414,7 @@ class KeepClient(object):
     def __init__(self, api_client=None, proxy=None,
                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
                  api_token=None, local_store=None, block_cache=None,
-                 num_retries=0):
+                 num_retries=0, session=None):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -462,13 +472,17 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.session = session if session is not None else requests.Session()
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
                 self.api_token = api_token
-                self.service_roots = [proxy]
+                self._keep_services = [{
+                    'uuid': 'proxy',
+                    '_service_root': proxy,
+                    }]
                 self.using_proxy = True
-                self.static_service_roots = True
+                self._static_services_list = True
             else:
                 # It's important to avoid instantiating an API client
                 # unless we actually need one, for testing's sake.
@@ -476,9 +490,9 @@ class KeepClient(object):
                     api_client = arvados.api('v1')
                 self.api_client = api_client
                 self.api_token = api_client.api_token
-                self.service_roots = None
+                self._keep_services = None
                 self.using_proxy = None
-                self.static_service_roots = False
+                self._static_services_list = False
 
     def current_timeout(self):
         """Return the appropriate timeout to use for this client: the proxy
@@ -489,9 +503,9 @@ class KeepClient(object):
         # KeepService, not a KeepClient. See #4488.
         return self.proxy_timeout if self.using_proxy else self.timeout
 
-    def build_service_roots(self, force_rebuild=False):
-        if (self.static_service_roots or
-              (self.service_roots and not force_rebuild)):
+    def build_services_list(self, force_rebuild=False):
+        if (self._static_services_list or
+              (self._keep_services and not force_rebuild)):
             return
         with self.lock:
             try:
@@ -499,68 +513,47 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            keep_services = keep_services.execute().get('items')
-            if not keep_services:
+            self._keep_services = keep_services.execute().get('items')
+            if not self._keep_services:
                 raise arvados.errors.NoKeepServersError()
 
             self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in keep_services)
-
-            roots = ("{}://[{}]:{:d}/".format(
-                        'https' if ks['service_ssl_flag'] else 'http',
-                         ks['service_host'],
-                         ks['service_port'])
-                     for ks in keep_services)
-            self.service_roots = sorted(set(roots))
-            _logger.debug(str(self.service_roots))
-
-    def shuffled_service_roots(self, hash, force_rebuild=False):
-        self.build_service_roots(force_rebuild)
-
-        # Build an ordering with which to query the Keep servers based on the
-        # contents of the hash.
-        # "hash" is a hex-encoded number at least 8 digits
-        # (32 bits) long
-
-        # seed used to calculate the next keep server from 'pool'
-        # to be added to 'pseq'
-        seed = hash
-
-        # Keep servers still to be added to the ordering
-        pool = self.service_roots[:]
-
-        # output probe sequence
-        pseq = []
-
-        # iterate while there are servers left to be assigned
-        while len(pool) > 0:
-            if len(seed) < 8:
-                # ran out of digits in the seed
-                if len(pseq) < len(hash) / 4:
-                    # the number of servers added to the probe sequence is less
-                    # than the number of 4-digit slices in 'hash' so refill the
-                    # seed with the last 4 digits and then append the contents
-                    # of 'hash'.
-                    seed = hash[-4:] + hash
-                else:
-                    # refill the seed with the contents of 'hash'
-                    seed += hash
-
-            # Take the next 8 digits (32 bytes) and interpret as an integer,
-            # then modulus with the size of the remaining pool to get the next
-            # selected server.
-            probe = int(seed[0:8], 16) % len(pool)
-
-            # Append the selected server to the probe sequence and remove it
-            # from the pool.
-            pseq += [pool[probe]]
-            pool = pool[:probe] + pool[probe+1:]
-
-            # Remove the digits just used from the seed
-            seed = seed[8:]
-        _logger.debug(str(pseq))
-        return pseq
+                                   for ks in self._keep_services)
+
+            # Precompute the base URI for each service.
+            for r in self._keep_services:
+                r['_service_root'] = "{}://[{}]:{:d}/".format(
+                    'https' if r['service_ssl_flag'] else 'http',
+                    r['service_host'],
+                    r['service_port'])
+            _logger.debug(str(self._keep_services))
+
+    def _service_weight(self, data_hash, service_uuid):
+        """Compute the weight of a Keep service endpoint for a data
+        block with a known hash.
+
+        The weight is md5(h + u) where u is the last 15 characters of
+        the service endpoint's UUID.
+        """
+        return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
 
+    def weighted_service_roots(self, data_hash, force_rebuild=False):
+        """Return an array of Keep service endpoints, in the order in
+        which they should be probed when reading or writing data with
+        the given hash.
+        """
+        self.build_services_list(force_rebuild)
+
+        # Sort the available services by weight (heaviest first) for
+        # this data_hash, and return their service_roots (base URIs)
+        # in that order.
+        sorted_roots = [
+            svc['_service_root'] for svc in sorted(
+                self._keep_services,
+                reverse=True,
+                key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
+        _logger.debug(data_hash + ': ' + str(sorted_roots))
+        return sorted_roots
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -568,10 +561,10 @@ class KeepClient(object):
         # new ones to roots_map.  Return the current list of local
         # root strings.
         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
-        local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
+        local_roots = self.weighted_service_roots(md5_s, force_rebuild)
         for root in local_roots:
             if root not in roots_map:
-                roots_map[root] = self.KeepService(root, **headers)
+                roots_map[root] = self.KeepService(root, self.session, **headers)
         return local_roots
 
     @staticmethod
@@ -593,7 +586,7 @@ class KeepClient(object):
             return None
 
     @retry.retry_method
-    def get(self, loc_s, num_retries=None):
+    def get(self, loc_s, num_retries=None, cache_only=False):
         """Get data from Keep.
 
         This method fetches one or more blocks of data from Keep.  It
@@ -612,12 +605,21 @@ class KeepClient(object):
           to fetch data from every available Keep service, along with any
           that are named in location hints in the locator.  The default value
           is set when the KeepClient is initialized.
+        * cache_only: If true, return the block data only if already present in
+          cache, otherwise return None.
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
+        if cache_only:
+            slot = self.block_cache.get(expect_hash)
+            if slot.ready.is_set():
+                return slot.get()
+            else:
+                return None
+
         slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
@@ -631,7 +633,7 @@ class KeepClient(object):
         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
                       for hint in locator.hints if hint.startswith('K@')]
         # Map root URLs their KeepService objects.
-        roots_map = {root: self.KeepService(root) for root in hint_roots}
+        roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
         blob = None
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
@@ -661,17 +663,27 @@ class KeepClient(object):
         if loop.success():
             return blob
 
-        # No servers fulfilled the request.  Count how many responded
-        # "not found;" if the ratio is high enough (currently 75%), report
-        # Not Found; otherwise a generic error.
+        try:
+            all_roots = local_roots + hint_roots
+        except NameError:
+            # We never successfully fetched local_roots.
+            all_roots = hint_roots
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
-        not_founds = sum(1 for ks in roots_map.values()
-                         if ks.last_status() in set([403, 404, 410]))
-        if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
-            raise arvados.errors.NotFoundError(loc_s)
+        not_founds = sum(1 for key in all_roots
+                         if roots_map[key].last_status() in {403, 404, 410})
+        service_errors = ((key, roots_map[key].last_result)
+                          for key in all_roots)
+        if not roots_map:
+            raise arvados.errors.KeepReadError(
+                "failed to read {}: no Keep services available ({})".format(
+                    loc_s, loop.last_result()))
+        elif not_founds == len(all_roots):
+            raise arvados.errors.NotFoundError(
+                "{} not found".format(loc_s), service_errors)
         else:
-            raise arvados.errors.KeepReadError(loc_s)
+            raise arvados.errors.KeepReadError(
+                "failed to read {}".format(loc_s), service_errors)
 
     @retry.retry_method
     def put(self, data, copies=2, num_retries=None):
@@ -732,14 +744,30 @@ class KeepClient(object):
 
         if loop.success():
             return thread_limiter.response()
-        raise arvados.errors.KeepWriteError(
-            "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, copies, thread_limiter.done()))
-
-    # Local storage methods need no-op num_retries arguments to keep
-    # integration tests happy.  With better isolation they could
-    # probably be removed again.
-    def local_store_put(self, data, num_retries=0):
+        if not roots_map:
+            raise arvados.errors.KeepWriteError(
+                "failed to write {}: no Keep services available ({})".format(
+                    data_hash, loop.last_result()))
+        else:
+            service_errors = ((key, roots_map[key].last_result)
+                              for key in local_roots
+                              if not roots_map[key].success_flag)
+            raise arvados.errors.KeepWriteError(
+                "failed to write {} (wanted {} copies but wrote {})".format(
+                    data_hash, copies, thread_limiter.done()), service_errors)
+
+    def local_store_put(self, data, copies=1, num_retries=None):
+        """A stub for put().
+
+        This method is used in place of the real put() method when
+        using local storage (see constructor's local_store argument).
+
+        copies and num_retries arguments are ignored: they are here
+        only for the sake of offering the same call signature as
+        put().
+
+        Data stored this way can be retrieved via local_store_get().
+        """
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -748,7 +776,8 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s, num_retries=0):
+    def local_store_get(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
         try:
             locator = KeepLocator(loc_s)
         except ValueError:
@@ -758,3 +787,6 @@ class KeepClient(object):
             return ''
         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
             return f.read()
+
+    def is_cached(self, locator):
+        return self.block_cache.reserve_cache(expect_hash)