from __future__ import absolute_import from __future__ import division from future import standard_library standard_library.install_aliases() from builtins import str from builtins import range from builtins import object import hashlib import mock import os import pycurl import random import re import socket import sys import threading import time import unittest import urllib.parse import arvados import arvados.retry from . import arvados_testutil as tutil from . import keepstub from . import run_test_server class KeepTestCase(run_test_server.TestCaseWithServers): MAIN_SERVER = {} KEEP_SERVER = {} @classmethod def setUpClass(cls): super(KeepTestCase, cls).setUpClass() run_test_server.authorize_with("admin") cls.api_client = arvados.api('v1') cls.keep_client = arvados.KeepClient(api_client=cls.api_client, proxy='', local_store='') def test_KeepBasicRWTest(self): self.assertEqual(0, self.keep_client.upload_counter.get()) foo_locator = self.keep_client.put('foo') self.assertRegex( foo_locator, '^acbd18db4cc2f85cedef654fccc4a4d8\+3', 'wrong md5 hash from Keep.put("foo"): ' + foo_locator) # 6 bytes because uploaded 2 copies self.assertEqual(6, self.keep_client.upload_counter.get()) self.assertEqual(0, self.keep_client.download_counter.get()) self.assertEqual(self.keep_client.get(foo_locator), b'foo', 'wrong content from Keep.get(md5("foo"))') self.assertEqual(3, self.keep_client.download_counter.get()) def test_KeepBinaryRWTest(self): blob_str = b'\xff\xfe\xf7\x00\x01\x02' blob_locator = self.keep_client.put(blob_str) self.assertRegex( blob_locator, '^7fc7c53b45e53926ba52821140fef396\+6', ('wrong locator from Keep.put():' + blob_locator)) self.assertEqual(self.keep_client.get(blob_locator), blob_str, 'wrong content from Keep.get(md5())') def test_KeepLongBinaryRWTest(self): blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03' for i in range(0,23): blob_data = blob_data + blob_data blob_locator = self.keep_client.put(blob_data) self.assertRegex( blob_locator, '^84d90fc0d8175dd5dcfab04b999bc956\+67108864', ('wrong locator from Keep.put(): ' + blob_locator)) self.assertEqual(self.keep_client.get(blob_locator), blob_data, 'wrong content from Keep.get(md5())') @unittest.skip("unreliable test - please fix and close #8752") def test_KeepSingleCopyRWTest(self): blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03' blob_locator = self.keep_client.put(blob_data, copies=1) self.assertRegex( blob_locator, '^c902006bc98a3eb4a3663b65ab4a6fab\+8', ('wrong locator from Keep.put(): ' + blob_locator)) self.assertEqual(self.keep_client.get(blob_locator), blob_data, 'wrong content from Keep.get(md5())') def test_KeepEmptyCollectionTest(self): blob_locator = self.keep_client.put('', copies=1) self.assertRegex( blob_locator, '^d41d8cd98f00b204e9800998ecf8427e\+0', ('wrong locator from Keep.put(""): ' + blob_locator)) def test_unicode_must_be_ascii(self): # If unicode type, must only consist of valid ASCII foo_locator = self.keep_client.put(u'foo') self.assertRegex( foo_locator, '^acbd18db4cc2f85cedef654fccc4a4d8\+3', 'wrong md5 hash from Keep.put("foo"): ' + foo_locator) if sys.version_info < (3, 0): with self.assertRaises(UnicodeEncodeError): # Error if it is not ASCII self.keep_client.put(u'\xe2') with self.assertRaises(AttributeError): # Must be bytes or have an encode() method self.keep_client.put({}) def test_KeepHeadTest(self): locator = self.keep_client.put('test_head') self.assertRegex( locator, '^b9a772c7049325feb7130fff1f8333e9\+9', 'wrong md5 hash from Keep.put for "test_head": ' + locator) self.assertEqual(True, self.keep_client.head(locator)) self.assertEqual(self.keep_client.get(locator), b'test_head', 'wrong content from Keep.get for "test_head"') class KeepPermissionTestCase(run_test_server.TestCaseWithServers): MAIN_SERVER = {} KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789', 'enforce_permissions': True} def test_KeepBasicRWTest(self): run_test_server.authorize_with('active') keep_client = arvados.KeepClient() foo_locator = keep_client.put('foo') self.assertRegex( foo_locator, r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$', 'invalid locator from Keep.put("foo"): ' + foo_locator) self.assertEqual(keep_client.get(foo_locator), b'foo', 'wrong content from Keep.get(md5("foo"))') # GET with an unsigned locator => NotFound bar_locator = keep_client.put('bar') unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3" self.assertRegex( bar_locator, r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$', 'invalid locator from Keep.put("bar"): ' + bar_locator) self.assertRaises(arvados.errors.NotFoundError, keep_client.get, unsigned_bar_locator) # GET from a different user => NotFound run_test_server.authorize_with('spectator') self.assertRaises(arvados.errors.NotFoundError, arvados.Keep.get, bar_locator) # Unauthenticated GET for a signed locator => NotFound # Unauthenticated GET for an unsigned locator => NotFound keep_client.api_token = '' self.assertRaises(arvados.errors.NotFoundError, keep_client.get, bar_locator) self.assertRaises(arvados.errors.NotFoundError, keep_client.get, unsigned_bar_locator) # KeepOptionalPermission: starts Keep with --permission-key-file # but not --enforce-permissions (i.e. generate signatures on PUT # requests, but do not require them for GET requests) # # All of these requests should succeed when permissions are optional: # * authenticated request, signed locator # * authenticated request, unsigned locator # * unauthenticated request, signed locator # * unauthenticated request, unsigned locator class KeepOptionalPermission(run_test_server.TestCaseWithServers): MAIN_SERVER = {} KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789', 'enforce_permissions': False} @classmethod def setUpClass(cls): super(KeepOptionalPermission, cls).setUpClass() run_test_server.authorize_with("admin") cls.api_client = arvados.api('v1') def setUp(self): super(KeepOptionalPermission, self).setUp() self.keep_client = arvados.KeepClient(api_client=self.api_client, proxy='', local_store='') def _put_foo_and_check(self): signed_locator = self.keep_client.put('foo') self.assertRegex( signed_locator, r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$', 'invalid locator from Keep.put("foo"): ' + signed_locator) return signed_locator def test_KeepAuthenticatedSignedTest(self): signed_locator = self._put_foo_and_check() self.assertEqual(self.keep_client.get(signed_locator), b'foo', 'wrong content from Keep.get(md5("foo"))') def test_KeepAuthenticatedUnsignedTest(self): signed_locator = self._put_foo_and_check() self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"), b'foo', 'wrong content from Keep.get(md5("foo"))') def test_KeepUnauthenticatedSignedTest(self): # Check that signed GET requests work even when permissions # enforcement is off. signed_locator = self._put_foo_and_check() self.keep_client.api_token = '' self.assertEqual(self.keep_client.get(signed_locator), b'foo', 'wrong content from Keep.get(md5("foo"))') def test_KeepUnauthenticatedUnsignedTest(self): # Since --enforce-permissions is not in effect, GET requests # need not be authenticated. signed_locator = self._put_foo_and_check() self.keep_client.api_token = '' self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"), b'foo', 'wrong content from Keep.get(md5("foo"))') class KeepProxyTestCase(run_test_server.TestCaseWithServers): MAIN_SERVER = {} KEEP_SERVER = {} KEEP_PROXY_SERVER = {} @classmethod def setUpClass(cls): super(KeepProxyTestCase, cls).setUpClass() run_test_server.authorize_with('active') cls.api_client = arvados.api('v1') def tearDown(self): arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None) super(KeepProxyTestCase, self).tearDown() def test_KeepProxyTest1(self): # Will use ARVADOS_KEEP_SERVICES environment variable that # is set by setUpClass(). keep_client = arvados.KeepClient(api_client=self.api_client, local_store='') baz_locator = keep_client.put('baz') self.assertRegex( baz_locator, '^73feffa4b7f6bb68e44cf984c85f6e88\+3', 'wrong md5 hash from Keep.put("baz"): ' + baz_locator) self.assertEqual(keep_client.get(baz_locator), b'baz', 'wrong content from Keep.get(md5("baz"))') self.assertTrue(keep_client.using_proxy) def test_KeepProxyTest2(self): # Don't instantiate the proxy directly, but set the X-External-Client # header. The API server should direct us to the proxy. arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true' keep_client = arvados.KeepClient(api_client=self.api_client, proxy='', local_store='') baz_locator = keep_client.put('baz2') self.assertRegex( baz_locator, '^91f372a266fe2bf2823cb8ec7fda31ce\+4', 'wrong md5 hash from Keep.put("baz2"): ' + baz_locator) self.assertEqual(keep_client.get(baz_locator), b'baz2', 'wrong content from Keep.get(md5("baz2"))') self.assertTrue(keep_client.using_proxy) def test_KeepProxyTestMultipleURIs(self): # Test using ARVADOS_KEEP_SERVICES env var overriding any # existing proxy setting and setting multiple proxies arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/' keep_client = arvados.KeepClient(api_client=self.api_client, local_store='') uris = [x['_service_root'] for x in keep_client._keep_services] self.assertEqual(uris, ['http://10.0.0.1/', 'https://foo.example.org:1234/']) def test_KeepProxyTestInvalidURI(self): arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org' with self.assertRaises(arvados.errors.ArgumentError): keep_client = arvados.KeepClient(api_client=self.api_client, local_store='') class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock): def get_service_roots(self, api_client): keep_client = arvados.KeepClient(api_client=api_client) services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32)) return [urllib.parse.urlparse(url) for url in sorted(services)] def test_ssl_flag_respected_in_roots(self): for ssl_flag in [False, True]: services = self.get_service_roots(self.mock_keep_services( service_ssl_flag=ssl_flag)) self.assertEqual( ('https' if ssl_flag else 'http'), services[0].scheme) def test_correct_ports_with_ipv6_addresses(self): service = self.get_service_roots(self.mock_keep_services( service_type='proxy', service_host='100::1', service_port=10, count=1))[0] self.assertEqual('100::1', service.hostname) self.assertEqual(10, service.port) # test_*_timeout verify that KeepClient instructs pycurl to use # the appropriate connection and read timeouts. They don't care # whether pycurl actually exhibits the expected timeout behavior # -- those tests are in the KeepClientTimeout test class. def test_get_timeout(self): api_client = self.mock_keep_services(count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient(api_client=api_client) with self.assertRaises(arvados.errors.KeepReadError): keep_client.get('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_TIMEOUT[2])) def test_put_timeout(self): api_client = self.mock_keep_services(count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient(api_client=api_client) with self.assertRaises(arvados.errors.KeepWriteError): keep_client.put(b'foo') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_TIMEOUT[2])) def test_head_timeout(self): api_client = self.mock_keep_services(count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient(api_client=api_client) with self.assertRaises(arvados.errors.KeepReadError): keep_client.head('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_TIMEOUT[2])) def test_proxy_get_timeout(self): api_client = self.mock_keep_services(service_type='proxy', count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient(api_client=api_client) with self.assertRaises(arvados.errors.KeepReadError): keep_client.get('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2])) def test_proxy_head_timeout(self): api_client = self.mock_keep_services(service_type='proxy', count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient(api_client=api_client) with self.assertRaises(arvados.errors.KeepReadError): keep_client.head('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2])) def test_proxy_put_timeout(self): api_client = self.mock_keep_services(service_type='proxy', count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient(api_client=api_client) with self.assertRaises(arvados.errors.KeepWriteError): keep_client.put('foo') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2])) def check_no_services_error(self, verb, exc_class): api_client = mock.MagicMock(name='api_client') api_client.keep_services().accessible().execute.side_effect = ( arvados.errors.ApiError) keep_client = arvados.KeepClient(api_client=api_client) with self.assertRaises(exc_class) as err_check: getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0') self.assertEqual(0, len(err_check.exception.request_errors())) def test_get_error_with_no_services(self): self.check_no_services_error('get', arvados.errors.KeepReadError) def test_head_error_with_no_services(self): self.check_no_services_error('head', arvados.errors.KeepReadError) def test_put_error_with_no_services(self): self.check_no_services_error('put', arvados.errors.KeepWriteError) def check_errors_from_last_retry(self, verb, exc_class): api_client = self.mock_keep_services(count=2) req_mock = tutil.mock_keep_responses( "retry error reporting test", 500, 500, 403, 403) with req_mock, tutil.skip_sleep, \ self.assertRaises(exc_class) as err_check: keep_client = arvados.KeepClient(api_client=api_client) getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0', num_retries=3) self.assertEqual([403, 403], [ getattr(error, 'status_code', None) for error in err_check.exception.request_errors().values()]) def test_get_error_reflects_last_retry(self): self.check_errors_from_last_retry('get', arvados.errors.KeepReadError) def test_head_error_reflects_last_retry(self): self.check_errors_from_last_retry('head', arvados.errors.KeepReadError) def test_put_error_reflects_last_retry(self): self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError) def test_put_error_does_not_include_successful_puts(self): data = 'partial failure test' data_loc = tutil.str_keep_locator(data) api_client = self.mock_keep_services(count=3) with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \ self.assertRaises(arvados.errors.KeepWriteError) as exc_check: keep_client = arvados.KeepClient(api_client=api_client) keep_client.put(data) self.assertEqual(2, len(exc_check.exception.request_errors())) def test_proxy_put_with_no_writable_services(self): data = 'test with no writable services' data_loc = tutil.str_keep_locator(data) api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1) with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \ self.assertRaises(arvados.errors.KeepWriteError) as exc_check: keep_client = arvados.KeepClient(api_client=api_client) keep_client.put(data) self.assertEqual(True, ("no Keep services available" in str(exc_check.exception))) self.assertEqual(0, len(exc_check.exception.request_errors())) def test_oddball_service_get(self): body = b'oddball service get' api_client = self.mock_keep_services(service_type='fancynewblobstore') with tutil.mock_keep_responses(body, 200): keep_client = arvados.KeepClient(api_client=api_client) actual = keep_client.get(tutil.str_keep_locator(body)) self.assertEqual(body, actual) def test_oddball_service_put(self): body = b'oddball service put' pdh = tutil.str_keep_locator(body) api_client = self.mock_keep_services(service_type='fancynewblobstore') with tutil.mock_keep_responses(pdh, 200): keep_client = arvados.KeepClient(api_client=api_client) actual = keep_client.put(body, copies=1) self.assertEqual(pdh, actual) def test_oddball_service_writer_count(self): body = b'oddball service writer count' pdh = tutil.str_keep_locator(body) api_client = self.mock_keep_services(service_type='fancynewblobstore', count=4) headers = {'x-keep-replicas-stored': 3} with tutil.mock_keep_responses(pdh, 200, 418, 418, 418, **headers) as req_mock: keep_client = arvados.KeepClient(api_client=api_client) actual = keep_client.put(body, copies=2) self.assertEqual(pdh, actual) self.assertEqual(1, req_mock.call_count) @tutil.skip_sleep class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock): def setUp(self): # expected_order[i] is the probe order for # hash=md5(sprintf("%064x",i)) where there are 16 services # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g., # the first probe for the block consisting of 64 "0" # characters is the service whose uuid is # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'. self.services = 16 self.expected_order = [ list('3eab2d5fc9681074'), list('097dba52e648f1c3'), list('c5b4e023f8a7d691'), list('9d81c02e76a3bf54'), ] self.blocks = [ "{:064x}".format(x).encode() for x in range(len(self.expected_order))] self.hashes = [ hashlib.md5(self.blocks[x]).hexdigest() for x in range(len(self.expected_order))] self.api_client = self.mock_keep_services(count=self.services) self.keep_client = arvados.KeepClient(api_client=self.api_client) def test_weighted_service_roots_against_reference_set(self): # Confirm weighted_service_roots() returns the correct order for i, hash in enumerate(self.hashes): roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) got_order = [ re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1) for root in roots] self.assertEqual(self.expected_order[i], got_order) def test_get_probe_order_against_reference_set(self): self._test_probe_order_against_reference_set( lambda i: self.keep_client.get(self.hashes[i], num_retries=1)) def test_head_probe_order_against_reference_set(self): self._test_probe_order_against_reference_set( lambda i: self.keep_client.head(self.hashes[i], num_retries=1)) def test_put_probe_order_against_reference_set(self): # copies=1 prevents the test from being sensitive to races # between writer threads. self._test_probe_order_against_reference_set( lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1)) def _test_probe_order_against_reference_set(self, op): for i in range(len(self.blocks)): with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \ self.assertRaises(arvados.errors.KeepRequestError): op(i) got_order = [ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1) for resp in mock.responses] self.assertEqual(self.expected_order[i]*2, got_order) def test_put_probe_order_multiple_copies(self): for copies in range(2, 4): for i in range(len(self.blocks)): with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \ self.assertRaises(arvados.errors.KeepWriteError): self.keep_client.put(self.blocks[i], num_retries=2, copies=copies) got_order = [ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1) for resp in mock.responses] # With T threads racing to make requests, the position # of a given server in the sequence of HTTP requests # (got_order) cannot be more than T-1 positions # earlier than that server's position in the reference # probe sequence (expected_order). # # Loop invariant: we have accounted for +pos+ expected # probes, either by seeing them in +got_order+ or by # putting them in +pending+ in the hope of seeing them # later. As long as +len(pending)