+
+
+
+ def test_disk_cache_error(self):
+ os.chmod(self.disk_cache_dir, stat.S_IRUSR)
+
+ # Fail during cache initialization.
+ with self.assertRaises(OSError):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+
+ def test_disk_cache_write_error(self):
+ block_cache = arvados.keep.KeepBlockCache(disk_cache=True,
+ disk_cache_dir=self.disk_cache_dir)
+
+ keep_client = arvados.KeepClient(api_client=self.api_client, block_cache=block_cache)
+
+ # Make the cache dir read-only
+ os.makedirs(os.path.join(self.disk_cache_dir, self.locator[0:3]))
+ os.chmod(os.path.join(self.disk_cache_dir, self.locator[0:3]), stat.S_IRUSR)
+
+ # Cache fails
+ with self.assertRaises(arvados.errors.KeepCacheError):
+ with tutil.mock_keep_responses(self.data, 200) as mock:
+ keep_client.get(self.locator)