Merge branch 'master' into 2681-new-inactive-user-notification
[arvados.git] / sdk / python / arvados / keep.py
index 88487ae96e672726cfa5dbb3142dcfdeafcbec94..a93c602f46221e919d805501796e5ba8327baf41 100644 (file)
@@ -55,6 +55,7 @@ class KeepClient(object):
         def __init__(self, todo):
             self._todo = todo
             self._done = 0
+            self._response = None
             self._todo_lock = threading.Semaphore(todo)
             self._done_lock = threading.Lock()
 
@@ -73,12 +74,23 @@ class KeepClient(object):
             with self._done_lock:
                 return (self._done < self._todo)
 
-        def increment_done(self):
+        def save_response(self, response_body, replicas_stored):
             """
-            Report that the current thread was successful.
+            Records a response body (a locator, possibly signed) returned by
+            the Keep server.  It is not necessary to save more than
+            one response, since we presume that any locator returned
+            in response to a successful request is valid.
             """
             with self._done_lock:
-                self._done += 1
+                self._done += replicas_stored
+                self._response = response_body
+
+        def response(self):
+            """
+            Returns the body from the response to a PUT request.
+            """
+            with self._done_lock:
+                return self._response
 
         def done(self):
             """
@@ -89,9 +101,9 @@ class KeepClient(object):
 
     class KeepWriterThread(threading.Thread):
         """
-        Write a blob of data to the given Keep server. Call
-        increment_done() of the given ThreadLimiter if the write
-        succeeds.
+        Write a blob of data to the given Keep server. On success, call
+        save_response() of the given ThreadLimiter to save the returned
+        locator.
         """
         def __init__(self, **kwargs):
             super(KeepClient.KeepWriterThread, self).__init__()
@@ -111,7 +123,14 @@ class KeepClient(object):
                 url = self.args['service_root'] + self.args['data_hash']
                 api_token = config.get('ARVADOS_API_TOKEN')
                 headers = {'Authorization': "OAuth2 %s" % api_token}
+
+                if self.args['using_proxy']:
+                    # We're using a proxy, so tell the proxy how many copies we
+                    # want it to store
+                    headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
                 try:
+                    logging.debug("Uploading to {}".format(url))
                     resp, content = h.request(url.encode('utf-8'), 'PUT',
                                               headers=headers,
                                               body=self.args['data'])
@@ -129,7 +148,18 @@ class KeepClient(object):
                                       (str(threading.current_thread()),
                                        self.args['data_hash'],
                                        self.args['service_root']))
-                        return limiter.increment_done()
+                        replicas_stored = 1
+                        if 'x-keep-replicas-stored' in resp:
+                            # Tick the 'done' counter for the number of replica
+                            # reported stored by the server, for the case that
+                            # we're talking to a proxy or other backend that
+                            # stores to multiple copies for us.
+                            try:
+                                replicas_stored = int(resp['x-keep-replicas-stored'])
+                            except ValueError:
+                                pass
+                        return limiter.save_response(content.strip(), replicas_stored)
+
                     logging.warning("Request fail: PUT %s => %s %s" %
                                     (url, resp['status'], content))
                 except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
@@ -143,34 +173,84 @@ class KeepClient(object):
         self._cache = []
         # default 256 megabyte cache
         self.cache_max = 256 * 1024 * 1024
+        self.using_proxy = False
 
     def shuffled_service_roots(self, hash):
         if self.service_roots == None:
             self.lock.acquire()
-            try:
-                keep_disks = arvados.api().keep_disks().list().execute()['items']
-                roots = (("http%s://%s:%d/" %
-                          ('s' if f['service_ssl_flag'] else '',
-                           f['service_host'],
-                           f['service_port']))
-                         for f in keep_disks)
-                self.service_roots = sorted(set(roots))
-                logging.debug(str(self.service_roots))
-            finally:
-                self.lock.release()
 
+            # Override normal keep disk lookup with an explict proxy
+            # configuration.
+            keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
+            if keep_proxy_env != None and len(keep_proxy_env) > 0:
+
+                if keep_proxy_env[-1:] != '/':
+                    keep_proxy_env += "/"
+                self.service_roots = [keep_proxy_env]
+                self.using_proxy = True
+            else:
+                try:
+                    try:
+                        keep_services = arvados.api().keep_services().accessible().execute()['items']
+                    except Exception:
+                        keep_services = arvados.api().keep_disks().list().execute()['items']
+
+                    if len(keep_services) == 0:
+                        raise arvados.errors.NoKeepServersError()
+
+                    if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
+                        self.using_proxy = True
+
+                    roots = (("http%s://%s:%d/" %
+                              ('s' if f['service_ssl_flag'] else '',
+                               f['service_host'],
+                               f['service_port']))
+                             for f in keep_services)
+                    self.service_roots = sorted(set(roots))
+                    logging.debug(str(self.service_roots))
+                finally:
+                    self.lock.release()
+
+        # Build an ordering with which to query the Keep servers based on the
+        # contents of the hash.
+        # "hash" is a hex-encoded number at least 8 digits
+        # (32 bits) long
+
+        # seed used to calculate the next keep server from 'pool'
+        # to be added to 'pseq'
         seed = hash
+
+        # Keep servers still to be added to the ordering
         pool = self.service_roots[:]
+
+        # output probe sequence
         pseq = []
+
+        # iterate while there are servers left to be assigned
         while len(pool) > 0:
             if len(seed) < 8:
-                if len(pseq) < len(hash) / 4: # first time around
+                # ran out of digits in the seed
+                if len(pseq) < len(hash) / 4:
+                    # the number of servers added to the probe sequence is less
+                    # than the number of 4-digit slices in 'hash' so refill the
+                    # seed with the last 4 digits and then append the contents
+                    # of 'hash'.
                     seed = hash[-4:] + hash
                 else:
+                    # refill the seed with the contents of 'hash'
                     seed += hash
+
+            # Take the next 8 digits (32 bytes) and interpret as an integer,
+            # then modulus with the size of the remaining pool to get the next
+            # selected server.
             probe = int(seed[0:8], 16) % len(pool)
+
+            # Append the selected server to the probe sequence and remove it
+            # from the pool.
             pseq += [pool[probe]]
             pool = pool[:probe] + pool[probe+1:]
+
+            # Remove the digits just used from the seed
             seed = seed[8:]
         logging.debug(str(pseq))
         return pseq
@@ -208,7 +288,7 @@ class KeepClient(object):
             self._cache_lock.release()
 
     def reserve_cache(self, locator):
-        '''Reserve a cache slot for the specified locator, 
+        '''Reserve a cache slot for the specified locator,
         or return the existing slot.'''
         self._cache_lock.acquire()
         try:
@@ -247,7 +327,7 @@ class KeepClient(object):
 
         try:
             for service_root in self.shuffled_service_roots(expect_hash):
-                url = service_root + expect_hash
+                url = service_root + locator
                 api_token = config.get('ARVADOS_API_TOKEN')
                 headers = {'Authorization': "OAuth2 %s" % api_token,
                            'Accept': 'application/octet-stream'}
@@ -259,7 +339,7 @@ class KeepClient(object):
 
             for location_hint in re.finditer(r'\+K@([a-z0-9]+)', locator):
                 instance = location_hint.group(1)
-                url = 'http://keep.' + instance + '.arvadosapi.com/' + expect_hash
+                url = 'http://keep.' + instance + '.arvadosapi.com/' + locator
                 blob = self.get_url(url, {}, expect_hash)
                 if blob:
                     slot.set(blob)
@@ -281,8 +361,8 @@ class KeepClient(object):
             with timer.Timer() as t:
                 resp, content = h.request(url.encode('utf-8'), 'GET',
                                           headers=headers)
-            logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content), 
-                                                                        t.msecs, 
+            logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
+                                                                        t.msecs,
                                                                         (len(content)/(1024*1024))/t.secs))
             if re.match(r'^2\d\d$', resp['status']):
                 m = hashlib.new('md5')
@@ -312,14 +392,17 @@ class KeepClient(object):
             t = KeepClient.KeepWriterThread(data=data,
                                             data_hash=data_hash,
                                             service_root=service_root,
-                                            thread_limiter=thread_limiter)
+                                            thread_limiter=thread_limiter,
+                                            using_proxy=self.using_proxy,
+                                            want_copies=(want_copies if self.using_proxy else 1))
             t.start()
             threads += [t]
         for t in threads:
             t.join()
         have_copies = thread_limiter.done()
-        if have_copies == want_copies:
-            return (data_hash + '+' + str(len(data)))
+        # If we're done, return the response from Keep
+        if have_copies >= want_copies:
+            return thread_limiter.response()
         raise arvados.errors.KeepWriteError(
             "Write fail for %s: wanted %d but wrote %d" %
             (data_hash, want_copies, have_copies))