Merge branch 'master' into 7253-datamanager-test-errors
[arvados.git] / sdk / python / arvados / keep.py
index 63b99daedd3d3931ac3822da62bff1d556d0806e..c584f04ca06226b842e690751fb323fe60ac256a 100644 (file)
@@ -1,28 +1,16 @@
-import bz2
+import cStringIO
 import datetime
-import fcntl
-import functools
-import gflags
 import hashlib
-import json
 import logging
+import math
 import os
-import pprint
 import pycurl
 import Queue
 import re
 import socket
 import ssl
-import string
-import cStringIO
-import subprocess
-import sys
 import threading
-import time
 import timer
-import types
-import UserDict
-import zlib
 
 import arvados
 import arvados.config as config
@@ -160,6 +148,8 @@ class KeepBlockCache(object):
         self._cache_lock = threading.Lock()
 
     class CacheSlot(object):
+        __slots__ = ("locator", "ready", "content")
+
         def __init__(self, locator):
             self.locator = locator
             self.ready = threading.Event()
@@ -222,72 +212,103 @@ class KeepBlockCache(object):
                 self._cache.insert(0, n)
                 return n, True
 
+
+class Counter(object):
+    def __init__(self, v=0):
+        self._lk = threading.Lock()
+        self._val = v
+
+    def add(self, v):
+        with self._lk:
+            self._val += v
+
+    def get(self):
+        with self._lk:
+            return self._val
+
+
 class KeepClient(object):
 
     # Default Keep server connection timeout:  2 seconds
-    # Default Keep server read timeout:      300 seconds
+    # Default Keep server read timeout:       64 seconds
+    # Default Keep server bandwidth minimum:  32768 bytes per second
     # Default Keep proxy connection timeout:  20 seconds
-    # Default Keep proxy read timeout:       300 seconds
-    DEFAULT_TIMEOUT = (2, 300)
-    DEFAULT_PROXY_TIMEOUT = (20, 300)
+    # Default Keep proxy read timeout:        64 seconds
+    # Default Keep proxy bandwidth minimum:   32768 bytes per second
+    DEFAULT_TIMEOUT = (2, 64, 32768)
+    DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
 
     class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
+        """Limit the number of threads writing to Keep at once.
+
+        This ensures that only a number of writer threads that could
+        potentially achieve the desired replication level run at once.
+        Once the desired replication level is achieved, queued threads
+        are instructed not to run.
 
         Should be used in a "with" block.
         """
-        def __init__(self, todo):
-            self._todo = todo
+        def __init__(self, want_copies, max_service_replicas):
+            self._started = 0
+            self._want_copies = want_copies
             self._done = 0
             self._response = None
-            self._todo_lock = threading.Semaphore(todo)
+            self._start_lock = threading.Condition()
+            if (not max_service_replicas) or (max_service_replicas >= want_copies):
+                max_threads = 1
+            else:
+                max_threads = math.ceil(float(want_copies) / max_service_replicas)
+            _logger.debug("Limiter max threads is %d", max_threads)
+            self._todo_lock = threading.Semaphore(max_threads)
             self._done_lock = threading.Lock()
+            self._local = threading.local()
 
         def __enter__(self):
+            self._start_lock.acquire()
+            if getattr(self._local, 'sequence', None) is not None:
+                # If the calling thread has used set_sequence(N), then
+                # we wait here until N other threads have started.
+                while self._started < self._local.sequence:
+                    self._start_lock.wait()
             self._todo_lock.acquire()
+            self._started += 1
+            self._start_lock.notifyAll()
+            self._start_lock.release()
             return self
 
         def __exit__(self, type, value, traceback):
             self._todo_lock.release()
 
+        def set_sequence(self, sequence):
+            self._local.sequence = sequence
+
         def shall_i_proceed(self):
             """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
+            Return true if the current thread should write to Keep.
+            Return false otherwise.
             """
             with self._done_lock:
-                return (self._done < self._todo)
+                return (self._done < self._want_copies)
 
         def save_response(self, response_body, replicas_stored):
             """
             Records a response body (a locator, possibly signed) returned by
-            the Keep server.  It is not necessary to save more than
-            one response, since we presume that any locator returned
-            in response to a successful request is valid.
+            the Keep server, and the number of replicas it stored.
             """
             with self._done_lock:
                 self._done += replicas_stored
                 self._response = response_body
 
         def response(self):
-            """
-            Returns the body from the response to a PUT request.
-            """
+            """Return the body from the response to a PUT request."""
             with self._done_lock:
                 return self._response
 
         def done(self):
-            """
-            Return how many successes were reported.
-            """
+            """Return the total number of replicas successfully stored."""
             with self._done_lock:
                 return self._done
 
-
     class KeepService(object):
         """Make requests to a single Keep service, and track results.
 
@@ -306,7 +327,9 @@ class KeepClient(object):
             arvados.errors.HttpError,
         )
 
-        def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
+        def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
+                     upload_counter=None,
+                     download_counter=None, **headers):
             self.root = root
             self._user_agent_pool = user_agent_pool
             self._result = {'error': None}
@@ -315,6 +338,8 @@ class KeepClient(object):
             self.get_headers = {'Accept': 'application/octet-stream'}
             self.get_headers.update(headers)
             self.put_headers = headers
+            self.upload_counter = upload_counter
+            self.download_counter = download_counter
 
         def usable(self):
             """Is it worth attempting a request?"""
@@ -400,11 +425,13 @@ class KeepClient(object):
                 _logger.debug("Request fail: GET %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return None
-            _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
+            _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
                          self._result['status_code'],
                          len(self._result['body']),
                          t.msecs,
                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.download_counter:
+                self.download_counter.add(len(self._result['body']))
             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
             if resp_md5 != locator.md5sum:
                 _logger.warning("Checksum fail: md5(%s) = %s",
@@ -419,36 +446,37 @@ class KeepClient(object):
             _logger.debug("Request: PUT %s", url)
             curl = self._get_user_agent()
             try:
-                self._headers = {}
-                body_reader = cStringIO.StringIO(body)
-                response_body = cStringIO.StringIO()
-                curl.setopt(pycurl.NOSIGNAL, 1)
-                curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
-                curl.setopt(pycurl.URL, url.encode('utf-8'))
-                # Using UPLOAD tells cURL to wait for a "go ahead" from the
-                # Keep server (in the form of a HTTP/1.1 "100 Continue"
-                # response) instead of sending the request body immediately.
-                # This allows the server to reject the request if the request
-                # is invalid or the server is read-only, without waiting for
-                # the client to send the entire block.
-                curl.setopt(pycurl.UPLOAD, True)
-                curl.setopt(pycurl.INFILESIZE, len(body))
-                curl.setopt(pycurl.READFUNCTION, body_reader.read)
-                curl.setopt(pycurl.HTTPHEADER, [
-                    '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
-                curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
-                curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
-                self._setcurltimeouts(curl, timeout)
-                try:
-                    curl.perform()
-                except Exception as e:
-                    raise arvados.errors.HttpError(0, str(e))
-                self._result = {
-                    'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
-                    'body': response_body.getvalue(),
-                    'headers': self._headers,
-                    'error': False,
-                }
+                with timer.Timer() as t:
+                    self._headers = {}
+                    body_reader = cStringIO.StringIO(body)
+                    response_body = cStringIO.StringIO()
+                    curl.setopt(pycurl.NOSIGNAL, 1)
+                    curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
+                    curl.setopt(pycurl.URL, url.encode('utf-8'))
+                    # Using UPLOAD tells cURL to wait for a "go ahead" from the
+                    # Keep server (in the form of a HTTP/1.1 "100 Continue"
+                    # response) instead of sending the request body immediately.
+                    # This allows the server to reject the request if the request
+                    # is invalid or the server is read-only, without waiting for
+                    # the client to send the entire block.
+                    curl.setopt(pycurl.UPLOAD, True)
+                    curl.setopt(pycurl.INFILESIZE, len(body))
+                    curl.setopt(pycurl.READFUNCTION, body_reader.read)
+                    curl.setopt(pycurl.HTTPHEADER, [
+                        '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
+                    curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
+                    curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
+                    self._setcurltimeouts(curl, timeout)
+                    try:
+                        curl.perform()
+                    except Exception as e:
+                        raise arvados.errors.HttpError(0, str(e))
+                    self._result = {
+                        'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
+                        'body': response_body.getvalue(),
+                        'headers': self._headers,
+                        'error': False,
+                    }
                 ok = retry.check_http_response_success(self._result['status_code'])
                 if not ok:
                     self._result['error'] = arvados.errors.HttpError(
@@ -469,17 +497,30 @@ class KeepClient(object):
                 _logger.debug("Request fail: PUT %s => %s: %s",
                               url, type(self._result['error']), str(self._result['error']))
                 return False
+            _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
+                         self._result['status_code'],
+                         len(body),
+                         t.msecs,
+                         (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
+            if self.upload_counter:
+                self.upload_counter.add(len(body))
             return True
 
         def _setcurltimeouts(self, curl, timeouts):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
-                conn_t, xfer_t = timeouts
+                if len(timeouts) == 2:
+                    conn_t, xfer_t = timeouts
+                    bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+                else:
+                    conn_t, xfer_t, bandwidth_bps = timeouts
             else:
                 conn_t, xfer_t = (timeouts, timeouts)
+                bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
+            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             header_line = header_line.decode('iso-8859-1')
@@ -517,7 +558,11 @@ class KeepClient(object):
             return self._success
 
         def run(self):
-            with self.args['thread_limiter'] as limiter:
+            limiter = self.args['thread_limiter']
+            sequence = self.args['thread_sequence']
+            if sequence is not None:
+                limiter.set_sequence(sequence)
+            with limiter:
                 if not limiter.shall_i_proceed():
                     # My turn arrived, but the job has been done without
                     # me.
@@ -579,20 +624,22 @@ class KeepClient(object):
 
         :timeout:
           The initial timeout (in seconds) for HTTP requests to Keep
-          non-proxy servers.  A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout): see
-          http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
-          Because timeouts are often a result of transient server load, the
-          actual connection timeout will be increased by a factor of two on
-          each retry.
-          Default: (2, 300).
+          non-proxy servers.  A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). A connection
+          will be aborted if the average traffic rate falls below
+          minimum_bandwidth bytes per second over an interval of read_timeout
+          seconds. Because timeouts are often a result of transient server
+          load, the actual connection timeout will be increased by a factor
+          of two on each retry.
+          Default: (2, 64, 32768).
 
         :proxy_timeout:
           The initial timeout (in seconds) for HTTP requests to
-          Keep proxies. A tuple of two floats is interpreted as
-          (connection_timeout, read_timeout). The behavior described
-          above for adjusting connection timeouts on retry also applies.
-          Default: (20, 300).
+          Keep proxies. A tuple of three floats is interpreted as
+          (connection_timeout, read_timeout, minimum_bandwidth). The behavior
+          described above for adjusting connection timeouts on retry also
+          applies.
+          Default: (20, 64, 32768).
 
         :api_token:
           If you're not using an API client, but only talking
@@ -632,6 +679,12 @@ class KeepClient(object):
         self.timeout = timeout
         self.proxy_timeout = proxy_timeout
         self._user_agent_pool = Queue.LifoQueue()
+        self.upload_counter = Counter()
+        self.download_counter = Counter()
+        self.put_counter = Counter()
+        self.get_counter = Counter()
+        self.hits_counter = Counter()
+        self.misses_counter = Counter()
 
         if local_store:
             self.local_store = local_store
@@ -639,6 +692,7 @@ class KeepClient(object):
             self.put = self.local_store_put
         else:
             self.num_retries = num_retries
+            self.max_replicas_per_service = None
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -646,6 +700,7 @@ class KeepClient(object):
                 self._gateway_services = {}
                 self._keep_services = [{
                     'uuid': 'proxy',
+                    'service_type': 'proxy',
                     '_service_root': proxy,
                     }]
                 self._writable_services = self._keep_services
@@ -677,7 +732,13 @@ class KeepClient(object):
         # TODO(twp): the timeout should be a property of a
         # KeepService, not a KeepClient. See #4488.
         t = self.proxy_timeout if self.using_proxy else self.timeout
-        return (t[0] * (1 << attempt_number), t[1])
+        if len(t) == 2:
+            return (t[0] * (1 << attempt_number), t[1])
+        else:
+            return (t[0] * (1 << attempt_number), t[1], t[2])
+    def _any_nondisk_services(self, service_list):
+        return any(ks.get('service_type', 'disk') != 'disk'
+                   for ks in service_list)
 
     def build_services_list(self, force_rebuild=False):
         if (self._static_services_list or
@@ -689,12 +750,16 @@ class KeepClient(object):
             except Exception:  # API server predates Keep services.
                 keep_services = self.api_client.keep_disks().list()
 
-            accessible = keep_services.execute().get('items')
-            if not accessible:
+            # Gateway services are only used when specified by UUID,
+            # so there's nothing to gain by filtering them by
+            # service_type.
+            self._gateway_services = {ks['uuid']: ks for ks in
+                                      keep_services.execute()['items']}
+            if not self._gateway_services:
                 raise arvados.errors.NoKeepServersError()
 
             # Precompute the base URI for each service.
-            for r in accessible:
+            for r in self._gateway_services.itervalues():
                 host = r['service_host']
                 if not host.startswith('[') and host.find(':') >= 0:
                     # IPv6 URIs must be formatted like http://[::1]:80/...
@@ -704,22 +769,19 @@ class KeepClient(object):
                     host,
                     r['service_port'])
 
-            # Gateway services are only used when specified by UUID,
-            # so there's nothing to gain by filtering them by
-            # service_type.
-            self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
             _logger.debug(str(self._gateway_services))
-
             self._keep_services = [
-                ks for ks in accessible
-                if ks.get('service_type') in ['disk', 'proxy']]
-            self._writable_services = [
-                ks for ks in accessible
-                if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
-            _logger.debug(str(self._keep_services))
-
-            self.using_proxy = any(ks.get('service_type') == 'proxy'
-                                   for ks in self._keep_services)
+                ks for ks in self._gateway_services.itervalues()
+                if not ks.get('service_type', '').startswith('gateway:')]
+            self._writable_services = [ks for ks in self._keep_services
+                                       if not ks.get('read_only')]
+
+            # For disk type services, max_replicas_per_service is 1
+            # It is unknown (unlimited) for other service types.
+            if self._any_nondisk_services(self._writable_services):
+                self.max_replicas_per_service = None
+            else:
+                self.max_replicas_per_service = 1
 
     def _service_weight(self, data_hash, service_uuid):
         """Compute the weight of a Keep service endpoint for a data
@@ -738,7 +800,6 @@ class KeepClient(object):
         self.build_services_list(force_rebuild)
 
         sorted_roots = []
-
         # Use the services indicated by the given +K@... remote
         # service hints, if any are present and can be resolved to a
         # URI.
@@ -757,7 +818,8 @@ class KeepClient(object):
         # in that order.
         use_services = self._keep_services
         if need_writable:
-          use_services = self._writable_services
+            use_services = self._writable_services
+        self.using_proxy = self._any_nondisk_services(use_services)
         sorted_roots.extend([
             svc['_service_root'] for svc in sorted(
                 use_services,
@@ -776,7 +838,10 @@ class KeepClient(object):
         for root in local_roots:
             if root not in roots_map:
                 roots_map[root] = self.KeepService(
-                    root, self._user_agent_pool, **headers)
+                    root, self._user_agent_pool,
+                    upload_counter=self.upload_counter,
+                    download_counter=self.download_counter,
+                    **headers)
         return local_roots
 
     @staticmethod
@@ -828,12 +893,18 @@ class KeepClient(object):
         """
         if ',' in loc_s:
             return ''.join(self.get(x) for x in loc_s.split(','))
+
+        self.get_counter.add(1)
+
         locator = KeepLocator(loc_s)
         slot, first = self.block_cache.reserve_cache(locator.md5sum)
         if not first:
+            self.hits_counter.add(1)
             v = slot.get()
             return v
 
+        self.misses_counter.add(1)
+
         # If the locator has hints specifying a prefix (indicating a
         # remote keepproxy) or the UUID of a local gateway service,
         # read data from the indicated service(s) instead of the usual
@@ -848,7 +919,9 @@ class KeepClient(object):
                                    )])
         # Map root URLs to their KeepService objects.
         roots_map = {
-            root: self.KeepService(root, self._user_agent_pool)
+            root: self.KeepService(root, self._user_agent_pool,
+                                   upload_counter=self.upload_counter,
+                                   download_counter=self.download_counter)
             for root in hint_roots
         }
 
@@ -931,6 +1004,8 @@ class KeepClient(object):
         elif not isinstance(data, str):
             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
 
+        self.put_counter.add(1)
+
         data_hash = hashlib.md5(data).hexdigest()
         loc_s = data_hash + '+' + str(len(data))
         if copies < 1:
@@ -938,24 +1013,25 @@ class KeepClient(object):
         locator = KeepLocator(loc_s)
 
         headers = {}
-        if self.using_proxy:
-            # Tell the proxy how many copies we want it to store
-            headers['X-Keep-Desired-Replication'] = str(copies)
+        # Tell the proxy how many copies we want it to store
+        headers['X-Keep-Desired-Replication'] = str(copies)
         roots_map = {}
-        thread_limiter = KeepClient.ThreadLimiter(copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
+                sorted_roots = self.map_new_services(
                     roots_map, locator,
                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
                 loop.save_result(error)
                 continue
 
+            thread_limiter = KeepClient.ThreadLimiter(
+                copies, self.max_replicas_per_service)
             threads = []
-            for service_root, ks in roots_map.iteritems():
+            for service_root, ks in [(root, roots_map[root])
+                                     for root in sorted_roots]:
                 if ks.finished():
                     continue
                 t = KeepClient.KeepWriterThread(
@@ -964,7 +1040,8 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.current_timeout(num_retries-tries_left))
+                    timeout=self.current_timeout(num_retries-tries_left),
+                    thread_sequence=len(threads))
                 t.start()
                 threads.append(t)
             for t in threads:
@@ -979,7 +1056,7 @@ class KeepClient(object):
                     data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
-                              for key in local_roots
+                              for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(