From: Tom Clegg Date: Tue, 19 Nov 2013 23:07:21 +0000 (-0800) Subject: Add threaded Keep.put() X-Git-Tag: 1.1.0~2920 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/0b14778a494b1bd1bfea56b2dbba372417fca972 Add threaded Keep.put() --- diff --git a/sdk/python/arvados.py b/sdk/python/arvados.py index 91e00630c9..4f2762eaa7 100644 --- a/sdk/python/arvados.py +++ b/sdk/python/arvados.py @@ -781,14 +781,102 @@ class Keep: return global_client_object @staticmethod - def get(locator): - return Keep.global_client_object().get(locator) + def get(locator, **kwargs): + return Keep.global_client_object().get(locator, **kwargs) @staticmethod - def put(data): - return Keep.global_client_object().put(data) + def put(data, **kwargs): + return Keep.global_client_object().put(data, **kwargs) class KeepClient: + + class ThreadLimiter: + """ + Limit the number of threads running at a given time to + {desired successes} minus {successes reported}. When successes + reported == desired, wake up the remaining threads and tell + them to quit. + + Should be used in a "with" block. + """ + def __init__(self, todo): + self._todo = todo + self._done = 0 + self._todo_lock = threading.Semaphore(todo) + self._done_lock = threading.Lock() + def __enter__(self): + self._todo_lock.acquire() + return self + def __exit__(self, type, value, traceback): + self._todo_lock.release() + def shall_i_proceed(self): + """ + Return true if the current thread should do stuff. Return + false if the current thread should just stop. + """ + with self._done_lock: + return (self._done < self._todo) + def increment_done(self): + """ + Report that the current thread was successful. + """ + with self._done_lock: + self._done += 1 + def done(self): + """ + Return how many successes were reported. + """ + with self._done_lock: + return self._done + + class KeepWriterThread(threading.Thread): + """ + Write a blob of data to the given Keep server. Call + increment_done() of the given ThreadLimiter if the write + succeeds. + """ + def __init__(self, **kwargs): + super(KeepClient.KeepWriterThread, self).__init__() + self.args = kwargs + def run(self): + with self.args['thread_limiter'] as limiter: + if not limiter.shall_i_proceed(): + # My turn arrived, but the job has been done without + # me. + return + logging.debug("KeepWriterThread %s proceeding %s %s" % + (str(threading.current_thread()), + self.args['data_hash'], + self.args['service_root'])) + h = httplib2.Http() + url = self.args['service_root'] + self.args['data_hash'] + api_token = os.environ['ARVADOS_API_TOKEN'] + headers = {'Authorization': "OAuth2 %s" % api_token} + try: + resp, content = h.request(url.encode('utf-8'), 'PUT', + headers=headers, + body=self.args['data']) + if (resp['status'] == '401' and + re.match(r'Timestamp verification failed', content)): + body = KeepClient.sign_for_old_server( + self.args['data_hash'], + self.args['data']) + h = httplib2.Http() + resp, content = h.request(url.encode('utf-8'), 'PUT', + headers=headers, + body=body) + if re.match(r'^2\d\d$', resp['status']): + logging.debug("KeepWriterThread %s succeeded %s %s" % + (str(threading.current_thread()), + self.args['data_hash'], + self.args['service_root'])) + return limiter.increment_done() + logging.warning("Request fail: PUT %s => %s %s" % + (url, resp['status'], content)) + except (httplib2.HttpLib2Error, httplib.HTTPException) as e: + logging.warning("Request fail: PUT %s => %s: %s" % + (url, type(e), str(e))) + def __init__(self): self.lock = threading.Lock() self.service_roots = None @@ -854,36 +942,27 @@ class KeepClient: data_hash = m.hexdigest() have_copies = 0 want_copies = kwargs.get('copies', 2) + if not (want_copies > 0): + return data_hash + threads = [] + thread_limiter = KeepClient.ThreadLimiter(want_copies) for service_root in self.shuffled_service_roots(data_hash): - h = httplib2.Http() - url = service_root + data_hash - api_token = os.environ['ARVADOS_API_TOKEN'] - headers = {'Authorization': "OAuth2 %s" % api_token} - try: - resp, content = h.request(url.encode('utf-8'), 'PUT', - headers=headers, - body=data) - if (resp['status'] == '401' and - re.match(r'Timestamp verification failed', content)): - body = self.sign_for_old_server(data_hash, data) - h = httplib2.Http() - resp, content = h.request(url.encode('utf-8'), 'PUT', - headers=headers, - body=body) - if re.match(r'^2\d\d$', resp['status']): - have_copies += 1 - if have_copies == want_copies: - return data_hash + '+' + str(len(data)) - else: - logging.warning("Request fail: PUT %s => %s %s" % - (url, resp['status'], content)) - except (httplib2.HttpLib2Error, httplib.HTTPException) as e: - logging.warning("Request fail: PUT %s => %s: %s" % - (url, type(e), str(e))) + t = KeepClient.KeepWriterThread(data=data, + data_hash=data_hash, + service_root=service_root, + thread_limiter=thread_limiter) + t.start() + threads += [t] + for t in threads: + t.join() + have_copies = thread_limiter.done() + if have_copies == want_copies: + return (data_hash + '+' + str(len(data))) raise Exception("Write fail for %s: wanted %d but wrote %d" % (data_hash, want_copies, have_copies)) - def sign_for_old_server(self, data_hash, data): + @staticmethod + def sign_for_old_server(data_hash, data): return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data) diff --git a/sdk/python/test_keep_client.py b/sdk/python/test_keep_client.py index c7612d6eab..23fd582480 100644 --- a/sdk/python/test_keep_client.py +++ b/sdk/python/test_keep_client.py @@ -6,12 +6,14 @@ import unittest import arvados import os -class KeepRWTest(unittest.TestCase): +class KeepTestCase(unittest.TestCase): def setUp(self): try: del os.environ['KEEP_LOCAL_STORE'] except KeyError: pass + +class KeepBasicRWTest(KeepTestCase): def runTest(self): foo_locator = arvados.Keep.put('foo') self.assertEqual(foo_locator, @@ -20,11 +22,41 @@ class KeepRWTest(unittest.TestCase): self.assertEqual(arvados.Keep.get(foo_locator), 'foo', 'wrong content from Keep.get(md5("foo"))') + +class KeepBinaryRWTest(KeepTestCase): + def runTest(self): blob_str = '\xff\xfe\xf7\x00\x01\x02' blob_locator = arvados.Keep.put(blob_str) self.assertEqual(blob_locator, '7fc7c53b45e53926ba52821140fef396+6', - 'wrong md5 hash from Keep.put()') + ('wrong locator from Keep.put():' + + blob_locator)) + self.assertEqual(arvados.Keep.get(blob_locator), + blob_str, + 'wrong content from Keep.get(md5())') + +class KeepLongBinaryRWTest(KeepTestCase): + def runTest(self): + blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03' + for i in range(0,23): + blob_str = blob_str + blob_str + blob_locator = arvados.Keep.put(blob_str) + self.assertEqual(blob_locator, + '84d90fc0d8175dd5dcfab04b999bc956+67108864', + ('wrong locator from Keep.put(): ' + + blob_locator)) + self.assertEqual(arvados.Keep.get(blob_locator), + blob_str, + 'wrong content from Keep.get(md5())') + +class KeepSingleCopyRWTest(KeepTestCase): + def runTest(self): + blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03' + blob_locator = arvados.Keep.put(blob_str, copies=1) + self.assertEqual(blob_locator, + 'c902006bc98a3eb4a3663b65ab4a6fab+8', + ('wrong locator from Keep.put(): ' + + blob_locator)) self.assertEqual(arvados.Keep.get(blob_locator), blob_str, 'wrong content from Keep.get(md5())')