with self._done_lock:
return (self._done < self._todo)
- def save_response(self, response_body):
+ def save_response(self, response_body, replicas_stored):
"""
Records a response body (a locator, possibly signed) returned by
the Keep server. It is not necessary to save more than
in response to a successful request is valid.
"""
with self._done_lock:
- self._done += 1
+ self._done += replicas_stored
self._response = response_body
def response(self):
url = self.args['service_root'] + self.args['data_hash']
api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token}
+
+ if self.args['using_proxy']:
+ # We're using a proxy, so tell the proxy how many copies we
+ # want it to store
+ headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
try:
+ logging.debug("Uploading to {}".format(url))
resp, content = h.request(url.encode('utf-8'), 'PUT',
headers=headers,
body=self.args['data'])
(str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root']))
- return limiter.save_response(content.strip())
+ replicas_stored = 1
+ if 'x-keep-replicas-stored' in resp:
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ try:
+ replicas_stored = int(resp['x-keep-replicas-stored'])
+ except ValueError:
+ pass
+ return limiter.save_response(content.strip(), replicas_stored)
+
logging.warning("Request fail: PUT %s => %s %s" %
(url, resp['status'], content))
except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
self._cache = []
# default 256 megabyte cache
self.cache_max = 256 * 1024 * 1024
+ self.using_proxy = False
def shuffled_service_roots(self, hash):
if self.service_roots == None:
self.lock.acquire()
- try:
- keep_disks = arvados.api().keep_disks().list().execute()['items']
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_disks)
- self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
- finally:
- self.lock.release()
+
+ # Override normal keep disk lookup with an explict proxy
+ # configuration.
+ keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
+ if keep_proxy_env != None and len(keep_proxy_env) > 0:
+
+ if keep_proxy_env[-1:] != '/':
+ keep_proxy_env += "/"
+ self.service_roots = [keep_proxy_env]
+ self.using_proxy = True
+ else:
+ try:
+ try:
+ keep_services = arvados.api().keep_services().accessible().execute()['items']
+ except Exception:
+ keep_services = arvados.api().keep_disks().list().execute()['items']
+
+ if len(keep_services) == 0:
+ raise arvados.errors.NoKeepServersError()
+
+ if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
+ self.using_proxy = True
+
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_services)
+ self.service_roots = sorted(set(roots))
+ logging.debug(str(self.service_roots))
+ finally:
+ self.lock.release()
# Build an ordering with which to query the Keep servers based on the
# contents of the hash.
# selected server.
probe = int(seed[0:8], 16) % len(pool)
- print seed[0:8], int(seed[0:8], 16), len(pool), probe
-
# Append the selected server to the probe sequence and remove it
# from the pool.
pseq += [pool[probe]]
t = KeepClient.KeepWriterThread(data=data,
data_hash=data_hash,
service_root=service_root,
- thread_limiter=thread_limiter)
+ thread_limiter=thread_limiter,
+ using_proxy=self.using_proxy,
+ want_copies=(want_copies if self.using_proxy else 1))
t.start()
threads += [t]
for t in threads:
t.join()
have_copies = thread_limiter.done()
# If we're done, return the response from Keep
- if have_copies == want_copies:
+ if have_copies >= want_copies:
return thread_limiter.response()
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %