url = self.args['service_root'] + self.args['data_hash']
api_token = config.get('ARVADOS_API_TOKEN')
headers = {'Authorization': "OAuth2 %s" % api_token}
+
+ if self.args['using_proxy']:
+ # We're using a proxy, so tell the proxy how many copies we
+ # want it to store
+ headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
+
try:
resp, content = h.request(url.encode('utf-8'), 'PUT',
headers=headers,
(str(threading.current_thread()),
self.args['data_hash'],
self.args['service_root']))
- return limiter.increment_done()
+
+ if 'x-keep-replicas-stored' in resp:
+ # Tick the 'done' counter for the number of replica
+ # reported stored by the server, for the case that
+ # we're talking to a proxy or other backend that
+ # stores to multiple copies for us.
+ replicas = int(resp['x-keep-replicas-stored'])
+ while replicas > 0:
+ limiter.increment_done()
+ replicas -= 1
+ else:
+ limiter.increment_done()
+ return
logging.warning("Request fail: PUT %s => %s %s" %
(url, resp['status'], content))
except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
self._cache = []
# default 256 megabyte cache
self.cache_max = 256 * 1024 * 1024
+ self.using_proxy = False
def shuffled_service_roots(self, hash):
if self.service_roots == None:
self.lock.acquire()
- try:
- keep_disks = arvados.api().keep_disks().list().execute()['items']
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_disks)
- self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
- finally:
- self.lock.release()
+
+ # Override normal keep disk lookup with an explict proxy
+ # configuration.
+ keep_proxy_env = config.get("ARVADOS_KEEP_PROXY")
+ if keep_proxy_env != None:
+ if keep_proxy_env[-1:] != '/':
+ keep_proxy_env += "/"
+ self.service_roots = [keep_proxy_env]
+ self.using_proxy = True
+ else:
+ try:
+ try:
+ keep_services = arvados.api().keep_services().accessible().execute()['items']
+ except:
+ keep_services = arvados.api().keep_disks().list().execute()['items']
+
+ if len(keep_services) == 0:
+ raise arvados.errors.NoKeepServersError()
+
+ if 'service_type' in keep_services[0] and keep_services[0]['service_type'] == 'proxy':
+ self.using_proxy = True
+
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_services)
+ self.service_roots = sorted(set(roots))
+ logging.debug(str(self.service_roots))
+ finally:
+ self.lock.release()
seed = hash
pool = self.service_roots[:]
self._cache_lock.release()
def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
+ '''Reserve a cache slot for the specified locator,
or return the existing slot.'''
self._cache_lock.acquire()
try:
with timer.Timer() as t:
resp, content = h.request(url.encode('utf-8'), 'GET',
headers=headers)
- logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
- t.msecs,
+ logging.info("Received %s bytes in %s msec (%s MiB/sec)" % (len(content),
+ t.msecs,
(len(content)/(1024*1024))/t.secs))
if re.match(r'^2\d\d$', resp['status']):
m = hashlib.new('md5')
t = KeepClient.KeepWriterThread(data=data,
data_hash=data_hash,
service_root=service_root,
- thread_limiter=thread_limiter)
+ thread_limiter=thread_limiter,
+ using_proxy=self.using_proxy,
+ want_copies=(want_copies if self.using_proxy else 1))
t.start()
threads += [t]
for t in threads:
t.join()
have_copies = thread_limiter.done()
- if have_copies == want_copies:
+ if have_copies >= want_copies:
return (data_hash + '+' + str(len(data)))
raise arvados.errors.KeepWriteError(
"Write fail for %s: wanted %d but wrote %d" %