# Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 import hashlib import os import errno import pycurl import random import re import shutil import socket import sys import stat import tempfile import time import unittest import urllib.parse import mmap from unittest import mock from unittest.mock import patch import parameterized import arvados import arvados.retry import arvados.util from . import arvados_testutil as tutil from . import keepstub from . import run_test_server from .arvados_testutil import DiskCacheBase @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class KeepTestCase(run_test_server.TestCaseWithServers, DiskCacheBase): disk_cache = False MAIN_SERVER = {} KEEP_SERVER = {} block_cache_test = None @classmethod def setUpClass(cls): super(KeepTestCase, cls).setUpClass() run_test_server.authorize_with("admin") cls.api_client = arvados.api('v1') cls.block_cache_test = DiskCacheBase() cls.keep_client = arvados.KeepClient(api_client=cls.api_client, proxy='', local_store='', block_cache=cls.block_cache_test.make_block_cache(cls.disk_cache)) @classmethod def tearDownClass(cls): super(KeepTestCase, cls).setUpClass() cls.block_cache_test.tearDown() def test_KeepBasicRWTest(self): self.assertEqual(0, self.keep_client.upload_counter.get()) foo_locator = self.keep_client.put('foo') self.assertRegex( foo_locator, '^acbd18db4cc2f85cedef654fccc4a4d8\+3', 'wrong md5 hash from Keep.put("foo"): ' + foo_locator) # 6 bytes because uploaded 2 copies self.assertEqual(6, self.keep_client.upload_counter.get()) self.assertEqual(0, self.keep_client.download_counter.get()) self.assertTrue(tutil.binary_compare(self.keep_client.get(foo_locator), b'foo'), 'wrong content from Keep.get(md5("foo"))') self.assertEqual(3, self.keep_client.download_counter.get()) def test_KeepBinaryRWTest(self): blob_str = b'\xff\xfe\xf7\x00\x01\x02' blob_locator = self.keep_client.put(blob_str) self.assertRegex( blob_locator, '^7fc7c53b45e53926ba52821140fef396\+6', ('wrong locator from Keep.put():' + blob_locator)) self.assertEqual(self.keep_client.get(blob_locator), blob_str, 'wrong content from Keep.get(md5())') def test_KeepLongBinaryRWTest(self): blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03' for i in range(0, 23): blob_data = blob_data + blob_data blob_locator = self.keep_client.put(blob_data) self.assertRegex( blob_locator, '^84d90fc0d8175dd5dcfab04b999bc956\+67108864', ('wrong locator from Keep.put(): ' + blob_locator)) self.assertEqual(self.keep_client.get(blob_locator), blob_data, 'wrong content from Keep.get(md5())') @unittest.skip("unreliable test - please fix and close #8752") def test_KeepSingleCopyRWTest(self): blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03' blob_locator = self.keep_client.put(blob_data, copies=1) self.assertRegex( blob_locator, '^c902006bc98a3eb4a3663b65ab4a6fab\+8', ('wrong locator from Keep.put(): ' + blob_locator)) self.assertEqual(self.keep_client.get(blob_locator), blob_data, 'wrong content from Keep.get(md5())') def test_KeepEmptyCollectionTest(self): blob_locator = self.keep_client.put('', copies=1) self.assertRegex( blob_locator, '^d41d8cd98f00b204e9800998ecf8427e\+0', ('wrong locator from Keep.put(""): ' + blob_locator)) def test_unicode_must_be_ascii(self): # If unicode type, must only consist of valid ASCII foo_locator = self.keep_client.put(u'foo') self.assertRegex( foo_locator, '^acbd18db4cc2f85cedef654fccc4a4d8\+3', 'wrong md5 hash from Keep.put("foo"): ' + foo_locator) if sys.version_info < (3, 0): with self.assertRaises(UnicodeEncodeError): # Error if it is not ASCII self.keep_client.put(u'\xe2') with self.assertRaises(AttributeError): # Must be bytes or have an encode() method self.keep_client.put({}) def test_KeepHeadTest(self): locator = self.keep_client.put('test_head') self.assertRegex( locator, '^b9a772c7049325feb7130fff1f8333e9\+9', 'wrong md5 hash from Keep.put for "test_head": ' + locator) self.assertEqual(True, self.keep_client.head(locator)) self.assertEqual(self.keep_client.get(locator), b'test_head', 'wrong content from Keep.get for "test_head"') @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class KeepPermissionTestCase(run_test_server.TestCaseWithServers, DiskCacheBase): disk_cache = False MAIN_SERVER = {} KEEP_SERVER = {'blob_signing': True} def tearDown(self): DiskCacheBase.tearDown(self) def test_KeepBasicRWTest(self): run_test_server.authorize_with('active') keep_client = arvados.KeepClient(block_cache=self.make_block_cache(self.disk_cache)) foo_locator = keep_client.put('foo') self.assertRegex( foo_locator, r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$', 'invalid locator from Keep.put("foo"): ' + foo_locator) self.assertEqual(keep_client.get(foo_locator), b'foo', 'wrong content from Keep.get(md5("foo"))') # GET with an unsigned locator => bad request bar_locator = keep_client.put('bar') unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3" self.assertRegex( bar_locator, r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$', 'invalid locator from Keep.put("bar"): ' + bar_locator) self.assertRaises(arvados.errors.KeepReadError, keep_client.get, unsigned_bar_locator) # GET from a different user => bad request run_test_server.authorize_with('spectator') self.assertRaises(arvados.errors.KeepReadError, arvados.Keep.get, bar_locator) # Unauthenticated GET for a signed locator => bad request # Unauthenticated GET for an unsigned locator => bad request keep_client.api_token = '' self.assertRaises(arvados.errors.KeepReadError, keep_client.get, bar_locator) self.assertRaises(arvados.errors.KeepReadError, keep_client.get, unsigned_bar_locator) @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class KeepProxyTestCase(run_test_server.TestCaseWithServers, DiskCacheBase): disk_cache = False MAIN_SERVER = {} KEEP_SERVER = {} KEEP_PROXY_SERVER = {} @classmethod def setUpClass(cls): super(KeepProxyTestCase, cls).setUpClass() run_test_server.authorize_with('active') cls.api_client = arvados.api('v1') def tearDown(self): super(KeepProxyTestCase, self).tearDown() DiskCacheBase.tearDown(self) def test_KeepProxyTest1(self): # Will use ARVADOS_KEEP_SERVICES environment variable that # is set by setUpClass(). keep_client = arvados.KeepClient(api_client=self.api_client, local_store='', block_cache=self.make_block_cache(self.disk_cache)) baz_locator = keep_client.put('baz') self.assertRegex( baz_locator, '^73feffa4b7f6bb68e44cf984c85f6e88\+3', 'wrong md5 hash from Keep.put("baz"): ' + baz_locator) self.assertEqual(keep_client.get(baz_locator), b'baz', 'wrong content from Keep.get(md5("baz"))') self.assertTrue(keep_client.using_proxy) def test_KeepProxyTestMultipleURIs(self): # Test using ARVADOS_KEEP_SERVICES env var overriding any # existing proxy setting and setting multiple proxies arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/' keep_client = arvados.KeepClient(api_client=self.api_client, local_store='', block_cache=self.make_block_cache(self.disk_cache)) uris = [x['_service_root'] for x in keep_client._keep_services] self.assertEqual(uris, ['http://10.0.0.1/', 'https://foo.example.org:1234/']) def test_KeepProxyTestInvalidURI(self): arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org' with self.assertRaises(arvados.errors.ArgumentError): keep_client = arvados.KeepClient(api_client=self.api_client, local_store='', block_cache=self.make_block_cache(self.disk_cache)) @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase): disk_cache = False def tearDown(self): DiskCacheBase.tearDown(self) def get_service_roots(self, api_client): keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache)) services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32)) return [urllib.parse.urlparse(url) for url in sorted(services)] def test_ssl_flag_respected_in_roots(self): for ssl_flag in [False, True]: services = self.get_service_roots(self.mock_keep_services( service_ssl_flag=ssl_flag)) self.assertEqual( ('https' if ssl_flag else 'http'), services[0].scheme) def test_correct_ports_with_ipv6_addresses(self): service = self.get_service_roots(self.mock_keep_services( service_type='proxy', service_host='100::1', service_port=10, count=1))[0] self.assertEqual('100::1', service.hostname) self.assertEqual(10, service.port) def test_recognize_proxy_services_in_controller_response(self): keep_client = arvados.KeepClient(api_client=self.mock_keep_services( service_type='proxy', service_host='localhost', service_port=9, count=1), block_cache=self.make_block_cache(self.disk_cache)) try: # this will fail, but it ensures we get the service # discovery response keep_client.put('baz2', num_retries=0) except: pass self.assertTrue(keep_client.using_proxy) def test_insecure_disables_tls_verify(self): api_client = self.mock_keep_services(count=1) force_timeout = socket.timeout("timed out") api_client.insecure = True with tutil.mock_keep_responses(b'foo', 200) as mock: keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache)) keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3') self.assertEqual( mock.responses[0].getopt(pycurl.SSL_VERIFYPEER), 0) self.assertEqual( mock.responses[0].getopt(pycurl.SSL_VERIFYHOST), 0) api_client.insecure = False with tutil.mock_keep_responses(b'foo', 200) as mock: keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache)) keep_client.get('acbd18db4cc2f85cedef654fccc4a4d8+3') # getopt()==None here means we didn't change the # default. If we were using real pycurl instead of a mock, # it would return the default value 1. self.assertEqual( mock.responses[0].getopt(pycurl.SSL_VERIFYPEER), None) self.assertEqual( mock.responses[0].getopt(pycurl.SSL_VERIFYHOST), None) def test_refresh_signature(self): blk_digest = '6f5902ac237024bdd0c176cb93063dc4+11' blk_sig = 'da39a3ee5e6b4b0d3255bfef95601890afd80709@53bed294' local_loc = blk_digest+'+A'+blk_sig remote_loc = blk_digest+'+R'+blk_sig api_client = self.mock_keep_services(count=1) headers = {'X-Keep-Locator':local_loc} with tutil.mock_keep_responses('', 200, **headers): # Check that the translated locator gets returned keep_client = arvados.KeepClient(api_client=api_client, block_cache=self.make_block_cache(self.disk_cache)) self.assertEqual(local_loc, keep_client.refresh_signature(remote_loc)) # Check that refresh_signature() uses the correct method and headers keep_client._get_or_head = mock.MagicMock() keep_client.refresh_signature(remote_loc) args, kwargs = keep_client._get_or_head.call_args_list[0] self.assertIn(remote_loc, args) self.assertEqual("HEAD", kwargs['method']) self.assertIn('X-Keep-Signature', kwargs['headers']) # test_*_timeout verify that KeepClient instructs pycurl to use # the appropriate connection and read timeouts. They don't care # whether pycurl actually exhibits the expected timeout behavior # -- those tests are in the KeepClientTimeout test class. def test_get_timeout(self): api_client = self.mock_keep_services(count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) with self.assertRaises(arvados.errors.KeepReadError): keep_client.get('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_TIMEOUT[2])) def test_put_timeout(self): api_client = self.mock_keep_services(count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) with self.assertRaises(arvados.errors.KeepWriteError): keep_client.put(b'foo') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_TIMEOUT[2])) def test_head_timeout(self): api_client = self.mock_keep_services(count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) with self.assertRaises(arvados.errors.KeepReadError): keep_client.head('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), None) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), None) def test_proxy_get_timeout(self): api_client = self.mock_keep_services(service_type='proxy', count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) with self.assertRaises(arvados.errors.KeepReadError): keep_client.get('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2])) def test_proxy_head_timeout(self): api_client = self.mock_keep_services(service_type='proxy', count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) with self.assertRaises(arvados.errors.KeepReadError): keep_client.head('ffffffffffffffffffffffffffffffff') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), None) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), None) def test_proxy_put_timeout(self): self.disk_cache_dir = None api_client = self.mock_keep_services(service_type='proxy', count=1) force_timeout = socket.timeout("timed out") with tutil.mock_keep_responses(force_timeout, 0) as mock: keep_client = arvados.KeepClient( api_client=api_client, num_retries=0, ) with self.assertRaises(arvados.errors.KeepWriteError): keep_client.put('foo') self.assertEqual( mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000)) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_TIME), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1])) self.assertEqual( mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT), int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2])) def check_no_services_error(self, verb, exc_class): api_client = mock.MagicMock(name='api_client') api_client.keep_services().accessible().execute.side_effect = ( arvados.errors.ApiError) keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) with self.assertRaises(exc_class) as err_check: getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0') self.assertEqual(0, len(err_check.exception.request_errors())) def test_get_error_with_no_services(self): self.check_no_services_error('get', arvados.errors.KeepReadError) def test_head_error_with_no_services(self): self.check_no_services_error('head', arvados.errors.KeepReadError) def test_put_error_with_no_services(self): self.check_no_services_error('put', arvados.errors.KeepWriteError) def check_errors_from_last_retry(self, verb, exc_class): api_client = self.mock_keep_services(count=2) req_mock = tutil.mock_keep_responses( "retry error reporting test", 500, 500, 500, 500, 500, 500, 502, 502) with req_mock, tutil.skip_sleep, \ self.assertRaises(exc_class) as err_check: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0', num_retries=3) self.assertEqual([502, 502], [ getattr(error, 'status_code', None) for error in err_check.exception.request_errors().values()]) self.assertRegex(str(err_check.exception), r'failed to (read|write) .* after 4 attempts') def test_get_error_reflects_last_retry(self): self.check_errors_from_last_retry('get', arvados.errors.KeepReadError) def test_head_error_reflects_last_retry(self): self.check_errors_from_last_retry('head', arvados.errors.KeepReadError) def test_put_error_reflects_last_retry(self): self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError) def test_put_error_does_not_include_successful_puts(self): data = 'partial failure test' data_loc = tutil.str_keep_locator(data) api_client = self.mock_keep_services(count=3) with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \ self.assertRaises(arvados.errors.KeepWriteError) as exc_check: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) keep_client.put(data) self.assertEqual(2, len(exc_check.exception.request_errors())) def test_proxy_put_with_no_writable_services(self): data = 'test with no writable services' data_loc = tutil.str_keep_locator(data) api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1) with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \ self.assertRaises(arvados.errors.KeepWriteError) as exc_check: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) keep_client.put(data) self.assertEqual(True, ("no Keep services available" in str(exc_check.exception))) self.assertEqual(0, len(exc_check.exception.request_errors())) def test_oddball_service_get(self): body = b'oddball service get' api_client = self.mock_keep_services(service_type='fancynewblobstore') with tutil.mock_keep_responses(body, 200): keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) actual = keep_client.get(tutil.str_keep_locator(body)) self.assertEqual(body, actual) def test_oddball_service_put(self): body = b'oddball service put' pdh = tutil.str_keep_locator(body) api_client = self.mock_keep_services(service_type='fancynewblobstore') with tutil.mock_keep_responses(pdh, 200): keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) actual = keep_client.put(body, copies=1) self.assertEqual(pdh, actual) def test_oddball_service_writer_count(self): body = b'oddball service writer count' pdh = tutil.str_keep_locator(body) api_client = self.mock_keep_services(service_type='fancynewblobstore', count=4) headers = {'x-keep-replicas-stored': 3} with tutil.mock_keep_responses(pdh, 200, 418, 418, 418, **headers) as req_mock: keep_client = arvados.KeepClient( api_client=api_client, block_cache=self.make_block_cache(self.disk_cache), num_retries=0, ) actual = keep_client.put(body, copies=2) self.assertEqual(pdh, actual) self.assertEqual(1, req_mock.call_count) @tutil.skip_sleep @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase): disk_cache = False def setUp(self): self.api_client = self.mock_keep_services(count=2) self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache)) self.data = b'xyzzy' self.locator = '1271ed5ef305aadabc605b1609e24c52' def tearDown(self): DiskCacheBase.tearDown(self) @mock.patch('arvados.KeepClient.KeepService.get') def test_get_request_cache(self, get_mock): with tutil.mock_keep_responses(self.data, 200, 200): self.keep_client.get(self.locator) self.keep_client.get(self.locator) # Request already cached, don't require more than one request get_mock.assert_called_once() @mock.patch('arvados.KeepClient.KeepService.get') def test_head_request_cache(self, get_mock): with tutil.mock_keep_responses(self.data, 200, 200): self.keep_client.head(self.locator) self.keep_client.head(self.locator) # Don't cache HEAD requests so that they're not confused with GET reqs self.assertEqual(2, get_mock.call_count) @mock.patch('arvados.KeepClient.KeepService.get') def test_head_and_then_get_return_different_responses(self, get_mock): head_resp = None get_resp = None get_mock.side_effect = [b'first response', b'second response'] with tutil.mock_keep_responses(self.data, 200, 200): head_resp = self.keep_client.head(self.locator) get_resp = self.keep_client.get(self.locator) self.assertEqual(b'first response', head_resp) # First reponse was not cached because it was from a HEAD request. self.assertNotEqual(head_resp, get_resp) @tutil.skip_sleep @parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase): disk_cache = False def setUp(self): self.api_client = self.mock_keep_services(count=2) self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache)) self.data = b'xyzzy' self.locator = '1271ed5ef305aadabc605b1609e24c52' self.test_id = arvados.util.new_request_id() self.assertRegex(self.test_id, r'^req-[a-z0-9]{20}$') # If we don't set request_id to None explicitly here, it will # return : self.api_client.request_id = None def tearDown(self): DiskCacheBase.tearDown(self) def test_default_to_api_client_request_id(self): self.api_client.request_id = self.test_id with tutil.mock_keep_responses(self.locator, 200, 200) as mock: self.keep_client.put(self.data) self.assertEqual(2, len(mock.responses)) for resp in mock.responses: self.assertProvidedRequestId(resp) with tutil.mock_keep_responses(self.data, 200) as mock: self.keep_client.get(self.locator) self.assertProvidedRequestId(mock.responses[0]) with tutil.mock_keep_responses(b'', 200) as mock: self.keep_client.head(self.locator) self.assertProvidedRequestId(mock.responses[0]) def test_explicit_request_id(self): with tutil.mock_keep_responses(self.locator, 200, 200) as mock: self.keep_client.put(self.data, request_id=self.test_id) self.assertEqual(2, len(mock.responses)) for resp in mock.responses: self.assertProvidedRequestId(resp) with tutil.mock_keep_responses(self.data, 200) as mock: self.keep_client.get(self.locator, request_id=self.test_id) self.assertProvidedRequestId(mock.responses[0]) with tutil.mock_keep_responses(b'', 200) as mock: self.keep_client.head(self.locator, request_id=self.test_id) self.assertProvidedRequestId(mock.responses[0]) def test_automatic_request_id(self): with tutil.mock_keep_responses(self.locator, 200, 200) as mock: self.keep_client.put(self.data) self.assertEqual(2, len(mock.responses)) for resp in mock.responses: self.assertAutomaticRequestId(resp) with tutil.mock_keep_responses(self.data, 200) as mock: self.keep_client.get(self.locator) self.assertAutomaticRequestId(mock.responses[0]) with tutil.mock_keep_responses(b'', 200) as mock: self.keep_client.head(self.locator) self.assertAutomaticRequestId(mock.responses[0]) def test_request_id_in_exception(self): with tutil.mock_keep_responses(b'', 400, 400, 400) as mock: with self.assertRaisesRegex(arvados.errors.KeepReadError, self.test_id): self.keep_client.head(self.locator, request_id=self.test_id) with tutil.mock_keep_responses(b'', 400, 400, 400) as mock: with self.assertRaisesRegex(arvados.errors.KeepReadError, r'req-[a-z0-9]{20}'): self.keep_client.get(self.locator) with tutil.mock_keep_responses(b'', 400, 400, 400) as mock: with self.assertRaisesRegex(arvados.errors.KeepWriteError, self.test_id): self.keep_client.put(self.data, request_id=self.test_id) with tutil.mock_keep_responses(b'', 400, 400, 400) as mock: with self.assertRaisesRegex(arvados.errors.KeepWriteError, r'req-[a-z0-9]{20}'): self.keep_client.put(self.data) def assertAutomaticRequestId(self, resp): hdr = [x for x in resp.getopt(pycurl.HTTPHEADER) if x.startswith('X-Request-Id: ')][0] self.assertNotEqual(hdr, 'X-Request-Id: '+self.test_id) self.assertRegex(hdr, r'^X-Request-Id: req-[a-z0-9]{20}$') def assertProvidedRequestId(self, resp): self.assertIn('X-Request-Id: '+self.test_id, resp.getopt(pycurl.HTTPHEADER)) @tutil.skip_sleep #@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}]) class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase): disk_cache = False def setUp(self): # expected_order[i] is the probe order for # hash=md5(sprintf("%064x",i)) where there are 16 services # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g., # the first probe for the block consisting of 64 "0" # characters is the service whose uuid is # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'. self.services = 16 self.expected_order = [ list('3eab2d5fc9681074'), list('097dba52e648f1c3'), list('c5b4e023f8a7d691'), list('9d81c02e76a3bf54'), ] self.blocks = [ "{:064x}".format(x).encode() for x in range(len(self.expected_order))] self.hashes = [ hashlib.md5(self.blocks[x]).hexdigest() for x in range(len(self.expected_order))] self.api_client = self.mock_keep_services(count=self.services) self.keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=self.make_block_cache(self.disk_cache)) def tearDown(self): DiskCacheBase.tearDown(self) def test_weighted_service_roots_against_reference_set(self): # Confirm weighted_service_roots() returns the correct order for i, hash in enumerate(self.hashes): roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) got_order = [ re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1) for root in roots] self.assertEqual(self.expected_order[i], got_order) def test_get_probe_order_against_reference_set(self): self._test_probe_order_against_reference_set( lambda i: self.keep_client.get(self.hashes[i], num_retries=1)) def test_head_probe_order_against_reference_set(self): self._test_probe_order_against_reference_set( lambda i: self.keep_client.head(self.hashes[i], num_retries=1)) def test_put_probe_order_against_reference_set(self): # copies=1 prevents the test from being sensitive to races # between writer threads. self._test_probe_order_against_reference_set( lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1)) def _test_probe_order_against_reference_set(self, op): for i in range(len(self.blocks)): with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \ self.assertRaises(arvados.errors.KeepRequestError): op(i) got_order = [ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1) for resp in mock.responses] self.assertEqual(self.expected_order[i]*2, got_order) def test_put_probe_order_multiple_copies(self): for copies in range(2, 4): for i in range(len(self.blocks)): with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \ self.assertRaises(arvados.errors.KeepWriteError): self.keep_client.put(self.blocks[i], num_retries=2, copies=copies) got_order = [ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL).decode()).group(1) for resp in mock.responses] # With T threads racing to make requests, the position # of a given server in the sequence of HTTP requests # (got_order) cannot be more than T-1 positions # earlier than that server's position in the reference # probe sequence (expected_order). # # Loop invariant: we have accounted for +pos+ expected # probes, either by seeing them in +got_order+ or by # putting them in +pending+ in the hope of seeing them # later. As long as +len(pending) block_cache.cache_max) def test_disk_cache_retry_write_error2(self): block_cache = arvados.keep.KeepBlockCache(disk_cache=True, disk_cache_dir=self.disk_cache_dir) keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache) called = False realmmap = mmap.mmap def sideeffect_mmap(*args, **kwargs): nonlocal called if not called: called = True raise OSError(errno.ENOMEM, "no memory") else: return realmmap(*args, **kwargs) with patch('mmap.mmap') as mockmmap: mockmmap.side_effect = sideeffect_mmap slots_before = block_cache._max_slots with tutil.mock_keep_responses(self.data, 200) as mock: self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data)) self.assertIsNotNone(keep_client.get_from_cache(self.locator)) with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f: self.assertTrue(tutil.binary_compare(f.read(), self.data)) # shrank the cache in response to ENOMEM self.assertTrue(slots_before > block_cache._max_slots)