import shutil
import socket
import sys
+import tempfile
import time
import unittest
import urllib.parse
# First reponse was not cached because it was from a HEAD request.
self.assertNotEqual(head_resp, get_resp)
+
+
+
@tutil.skip_sleep
@parameterized.parameterized_class([{"disk_cache": True}, {"disk_cache": False}])
class KeepStorageClassesTestCase(unittest.TestCase, tutil.ApiClientMock, DiskCacheBase):
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
with self.assertRaises(arvados.errors.KeepReadError):
keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+
+
+class KeepDiskCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+ self.disk_cache_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.disk_cache_dir)
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_read(self, get_mock):
+ # confirm it finds an existing cache block when the cache is
+ # initialized.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # block cache should have found the existing block
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm it finds a cache block written after the disk cache
+ # was initialized.
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ # when we try to get the block, it'll check the disk and find it.
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ get_mock.assert_not_called()
+
+
+ def test_disk_cache_write(self):
+ # confirm the cache block was created
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ self.assertTrue(tutil.binary_compare(keep_client.get(self.locator), self.data))
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "rb") as f:
+ self.assertTrue(tutil.binary_compare(f.read(), self.data))
+
+
+ def test_disk_cache_clean(self):
+ # confirm that a tmp file in the cache is cleaned up
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), "wb") as f:
+ f.write(b"abc1")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), "wb") as f:
+ f.write(b"abc2")
+
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), "wb") as f:
+ f.write(b"abc3")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # The tmp still hasn't been deleted because it was created in the last 60 seconds
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+ # Set the mtime to 61s in the past
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC"), times=(time.time()-61, time.time()-61))
+ os.utime(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC"), times=(time.time()-61, time.time()-61))
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ # Tmp should be gone but the other ones are safe.
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC.keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "tmpXYZABC")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], "XYZABC")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_cap(self, get_mock):
+ # confirm that the cache is kept to the desired limit
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertFalse(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_disk_cache_share(self, get_mock):
+ # confirm that a second cache doesn't delete files that belong to the first cache.
+
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ with open(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock"), "wb") as f:
+ f.write(self.data)
+
+ os.makedirs(os.path.join(self.disk_cache_dir, "acb"))
+ with open(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock"), "wb") as f:
+ f.write(b"foo")
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=2)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))
+
+
+ block_cache2 = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir,
+ max_slots=1)
+
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, self.locator[0:3], self.locator+".keepcacheblock")))
+ self.assertTrue(os.path.exists(os.path.join(self.disk_cache_dir, "acb", "acbd18db4cc2f85cedef654fccc4a4d8.keepcacheblock")))