From bf955fbaa337e2c382fa0af4d45d4937263d2004 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Sun, 17 Nov 2013 20:26:23 -0800 Subject: [PATCH] Use httplib2 to connect to Keep instead of executing whget/whput --- sdk/python/arvados.py | 123 ++++++++++++++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 28 deletions(-) diff --git a/sdk/python/arvados.py b/sdk/python/arvados.py index 7d99d582a6..2d08d72047 100644 --- a/sdk/python/arvados.py +++ b/sdk/python/arvados.py @@ -1,4 +1,5 @@ import gflags +import httplib import httplib2 import logging import os @@ -14,6 +15,7 @@ import string import bz2 import zlib import fcntl +import time from apiclient import errors from apiclient.discovery import build @@ -787,40 +789,105 @@ class CollectionWriter: manifest += "\n" return manifest +global_client_object = None + class Keep: @staticmethod - def put(data): - if 'KEEP_LOCAL_STORE' in os.environ: - return Keep.local_store_put(data) - p = subprocess.Popen(["whput", "-"], - stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=False, close_fds=True) - stdoutdata, stderrdata = p.communicate(data) - if p.returncode != 0: - raise Exception("whput subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata)) - return stdoutdata.rstrip() + def global_client_object(): + global global_client_object + if global_client_object == None: + global_client_object = KeepClient() + return global_client_object + @staticmethod def get(locator): + return Keep.global_client_object().get(locator) + + @staticmethod + def put(data): + return Keep.global_client_object().put(data) + +class KeepClient: + def __init__(self): + self.service_roots = None + + def shuffled_service_roots(self, hash): + if self.service_roots == None: + keep_disks = api().keep_disks().list().execute()['items'] + roots = (("http%s://%s:%d/" % + ('s' if f['service_ssl_flag'] else '', + f['service_host'], + f['service_port'])) + for f in keep_disks) + self.service_roots = sorted(set(roots)) + logging.debug(str(self.service_roots)) + return self.service_roots + + def get(self, locator): if 'KEEP_LOCAL_STORE' in os.environ: - return Keep.local_store_get(locator) - p = subprocess.Popen(["whget", locator, "-"], - stdout=subprocess.PIPE, - stdin=None, - stderr=subprocess.PIPE, - shell=False, close_fds=True) - stdoutdata, stderrdata = p.communicate(None) - if p.returncode != 0: - raise Exception("whget subprocess exited %d - stderr:\n%s" % (p.returncode, stderrdata)) + return KeepClient.local_store_get(locator) + expect_hash = re.sub(r'\+.*', '', locator) + for service_root in self.shuffled_service_roots(expect_hash): + h = httplib2.Http() + url = service_root + expect_hash + api_token = os.environ['ARVADOS_API_TOKEN'] + headers = {'Authorization': "OAuth2 %s" % api_token, + 'Accept': 'application/octet-stream'} + try: + resp, content = h.request(url, 'GET', headers=headers) + if re.match(r'^2\d\d$', resp['status']): + m = hashlib.new('md5') + m.update(content) + md5 = m.hexdigest() + if md5 == expect_hash: + return content + logging.warning("Checksum fail: md5(%s) = %s" % (url, md5)) + except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e: + logging.info("Request fail: GET %s => %s: %s" % + (url, type(e), str(e))) + raise Exception("Not found: %s" % expect_hash) + + def put(self, data, **kwargs): + if 'KEEP_LOCAL_STORE' in os.environ: + return KeepClient.local_store_put(data) m = hashlib.new('md5') - m.update(stdoutdata) - try: - if locator.index(m.hexdigest()) == 0: - return stdoutdata - except ValueError: - pass - raise Exception("md5 checksum mismatch: md5(get(%s)) == %s" % (locator, m.hexdigest())) + m.update(data) + data_hash = m.hexdigest() + have_copies = 0 + want_copies = kwargs.get('copies', 2) + for service_root in self.shuffled_service_roots(data_hash): + h = httplib2.Http() + url = service_root + data_hash + api_token = os.environ['ARVADOS_API_TOKEN'] + headers = {'Authorization': "OAuth2 %s" % api_token} + try: + resp, content = h.request(url, 'PUT', + headers=headers, + body=data) + if (resp['status'] == '401' and + re.match(r'Timestamp verification failed', content)): + body = self.sign_for_old_server(data_hash, data) + h = httplib2.Http() + resp, content = h.request(url, 'PUT', + headers=headers, + body=body) + if re.match(r'^2\d\d$', resp['status']): + have_copies += 1 + if have_copies == want_copies: + return data_hash + '+' + str(len(data)) + else: + logging.warning("Request fail: PUT %s => %s %s" % + (url, resp['status'], content)) + except (httplib2.HttpLib2Error, httplib.HTTPException) as e: + logging.warning("Request fail: PUT %s => %s: %s" % + (url, type(e), str(e))) + raise Exception("Write fail for %s: wanted %d but wrote %d" % + (data_hash, want_copies, have_copies)) + + def sign_for_old_server(self, data_hash, data): + return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data) + + @staticmethod def local_store_put(data): m = hashlib.new('md5') -- 2.39.5