From 4dc84fdc1f380b9d796308972648e6e36299684a Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Thu, 30 Apr 2015 09:19:15 -0400 Subject: [PATCH] 3793: Add Docker image cleaner service for compute nodes. This service monitors Docker events. When no containers are active, it arranges to keep the most recently used images up to a configured size limit, then deletes the rest. This will prevent Docker images from growing indefinitely on physical compute nodes. --- services/dockercleaner/.gitignore | 1 + .../dockercleaner/arvados_docker/__init__.py | 0 .../dockercleaner/arvados_docker/cleaner.py | 242 ++++++++++++ services/dockercleaner/gittaggers.py | 1 + services/dockercleaner/setup.py | 33 ++ services/dockercleaner/tests/__init__.py | 4 + services/dockercleaner/tests/test_cleaner.py | 362 ++++++++++++++++++ 7 files changed, 643 insertions(+) create mode 120000 services/dockercleaner/.gitignore create mode 100644 services/dockercleaner/arvados_docker/__init__.py create mode 100644 services/dockercleaner/arvados_docker/cleaner.py create mode 120000 services/dockercleaner/gittaggers.py create mode 100644 services/dockercleaner/setup.py create mode 100644 services/dockercleaner/tests/__init__.py create mode 100644 services/dockercleaner/tests/test_cleaner.py diff --git a/services/dockercleaner/.gitignore b/services/dockercleaner/.gitignore new file mode 120000 index 0000000000..ed3b3622f2 --- /dev/null +++ b/services/dockercleaner/.gitignore @@ -0,0 +1 @@ +../../sdk/python/.gitignore \ No newline at end of file diff --git a/services/dockercleaner/arvados_docker/__init__.py b/services/dockercleaner/arvados_docker/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/services/dockercleaner/arvados_docker/cleaner.py b/services/dockercleaner/arvados_docker/cleaner.py new file mode 100644 index 0000000000..a87741d3ed --- /dev/null +++ b/services/dockercleaner/arvados_docker/cleaner.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python3 +"""arvados_docker.cleaner - Remove unused Docker images from compute nodes + +Usage: + python3 -m arvados_docker.cleaner --keep 50G +""" + +import argparse +import collections +import copy +import functools +import logging +import json +import time +import sys + +import docker + +SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)} + +logger = logging.getLogger('arvados_docker.cleaner') + +def return_when_docker_not_found(result=None): + # If the decorated function raises a 404 error from Docker, return + # `result` instead. + def docker_not_found_decorator(orig_func): + @functools.wraps(orig_func) + def docker_not_found_wrapper(*args, **kwargs): + try: + return orig_func(*args, **kwargs) + except docker.errors.APIError as error: + if error.response.status_code != 404: + raise + return result + return docker_not_found_wrapper + return docker_not_found_decorator + +class DockerImage: + def __init__(self, image_hash): + self.docker_id = image_hash['Id'] + self.size = image_hash['VirtualSize'] + self.last_used = -1 + + def used_at(self, use_time): + self.last_used = max(self.last_used, use_time) + + +class DockerImages: + def __init__(self, target_size): + self.target_size = target_size + self.images = {} + self.container_image_map = {} + + @classmethod + def from_daemon(cls, target_size, docker_client): + images = cls(target_size) + for image in docker_client.images(): + images.add_image(image) + return images + + def add_image(self, image_hash): + image = DockerImage(image_hash) + self.images[image.docker_id] = image + logger.debug("Registered image %s", image.docker_id) + + def del_image(self, image_id): + if image_id in self.images: + del self.images[image_id] + self.container_image_map = { + cid: cid_image + for cid, cid_image in self.container_image_map.items() + if cid_image != image_id} + logger.debug("Unregistered image %s", image_id) + + def has_image(self, image_id): + return image_id in self.images + + def add_user(self, container_hash, use_time): + image_id = container_hash['Image'] + if image_id in self.images: + self.container_image_map[container_hash['Id']] = image_id + self.images[image_id].used_at(use_time) + logger.debug("Registered container %s using image %s", + container_hash['Id'], image_id) + + def end_user(self, cid): + self.container_image_map.pop(cid, None) + logger.debug("Unregistered container %s", cid) + + def any_users(self): + return bool(self.container_image_map) + + def should_delete(self): + used_images = set(self.container_image_map.values()) + space_left = (self.target_size - sum(self.images[image_id].size + for image_id in used_images)) + lru_images = [image for image in self.images.values() + if image.docker_id not in used_images] + lru_images.sort(key=lambda image: image.last_used) + keep_ids = set() + for image in reversed(lru_images): + if image.size <= space_left: + keep_ids.add(image.docker_id) + space_left -= image.size + for image in lru_images: + if image.docker_id not in keep_ids: + yield image.docker_id + + +class DockerEventHandlers: + # This class maps Docker event types to the names of methods that should + # receive those events. + def __init__(self): + self.handler_names = collections.defaultdict(list) + + def on(self, *status_names): + def register_handler(handler_method): + for status in status_names: + self.handler_names[status].append(handler_method.__name__) + return handler_method + return register_handler + + def for_event(self, status): + return iter(self.handler_names[status]) + + def copy(self): + result = self.__class__() + result.handler_names = copy.deepcopy(self.handler_names) + return result + + +class DockerEventListener: + # To use this class, define event_handlers as an instance of + # DockerEventHandlers. Call run() to iterate over events and call the + # handler methods as they come in. + ENCODING = 'utf-8' + + def __init__(self, events): + self.events = events + + def run(self): + for event in self.events: + event = json.loads(event.decode(self.ENCODING)) + for method_name in self.event_handlers.for_event(event['status']): + getattr(self, method_name)(event) + + +class DockerImageUseRecorder(DockerEventListener): + event_handlers = DockerEventHandlers() + + def __init__(self, images, docker_client, events): + self.images = images + self.docker_client = docker_client + super().__init__(events) + + @event_handlers.on('create') + @return_when_docker_not_found() + def load_container(self, event): + container_hash = self.docker_client.inspect_container(event['id']) + self.new_container(event, container_hash) + + def new_container(self, event, container_hash): + self.images.add_user(container_hash, event['time']) + + @event_handlers.on('destroy') + def container_stopped(self, event): + self.images.end_user(event['id']) + + +class DockerImageCleaner(DockerImageUseRecorder): + event_handlers = DockerImageUseRecorder.event_handlers.copy() + + def new_container(self, event, container_hash): + container_image_id = container_hash['Image'] + if not self.images.has_image(container_image_id): + image_hash = self.docker_client.inspect_image(container_image_id) + self.images.add_image(image_hash) + return super().new_container(event, container_hash) + + @event_handlers.on('destroy') + def clean_images(self, event=None): + if self.images.any_users(): + return + for image_id in self.images.should_delete(): + try: + self.docker_client.remove_image(image_id) + except docker.errors.APIError as error: + logger.warning("Failed to remove image %s: %s", image_id, error) + else: + logger.info("Removed image %s", image_id) + self.images.del_image(image_id) + + +def human_size(size_str): + size_str = size_str.lower().rstrip('b') + multiplier = SUFFIX_SIZES.get(size_str[-1]) + if multiplier is None: + multiplier = 1 + else: + size_str = size_str[:-1] + return int(size_str) * multiplier + +def parse_arguments(arguments): + parser = argparse.ArgumentParser( + prog="dockerclean", + description="clean old Docker images from Arvados compute nodes") + parser.add_argument( + '--keep', action='store', type=human_size, required=True, + help="size of Docker images to keep, suffixed with K/M/G/T") + parser.add_argument( + '--verbose', '-v', action='count', default=0, + help="log more information") + return parser.parse_args(arguments) + +def setup_logging(args): + log_handler = logging.StreamHandler() + log_handler.setFormatter(logging.Formatter( + '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s', + '%Y-%m-%d %H:%M:%S')) + logger.addHandler(log_handler) + logger.setLevel(logging.ERROR - (10 * args.verbose)) + +def run(args, docker_client): + start_time = int(time.time()) + logger.debug("Loading Docker activity through present") + images = DockerImages.from_daemon(args.keep, docker_client) + use_recorder = DockerImageUseRecorder( + images, docker_client, docker_client.events(since=1, until=start_time)) + use_recorder.run() + cleaner = DockerImageCleaner( + images, docker_client, docker_client.events(since=start_time)) + logger.info("Starting cleanup loop") + cleaner.clean_images() + cleaner.run() + +def main(arguments): + args = parse_arguments(arguments) + setup_logging(args) + run(args, docker.Client()) + +if __name__ == '__main__': + main(sys.argv[1:]) diff --git a/services/dockercleaner/gittaggers.py b/services/dockercleaner/gittaggers.py new file mode 120000 index 0000000000..a9ad861d81 --- /dev/null +++ b/services/dockercleaner/gittaggers.py @@ -0,0 +1 @@ +../../sdk/python/gittaggers.py \ No newline at end of file diff --git a/services/dockercleaner/setup.py b/services/dockercleaner/setup.py new file mode 100644 index 0000000000..a799ffe07e --- /dev/null +++ b/services/dockercleaner/setup.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 + +import os +import sys +import setuptools.command.egg_info as egg_info_cmd + +from setuptools import setup, find_packages + +try: + import gittaggers + tagger = gittaggers.EggInfoFromGit +except ImportError: + tagger = egg_info_cmd.egg_info + +setup(name="arvados-docker-cleaner", + version="0.1", + description="Arvados Docker cleaner", + author="Arvados", + author_email="info@arvados.org", + url="https://arvados.org", + download_url="https://github.com/curoverse/arvados.git", + license="GNU Affero General Public License version 3.0", + packages=find_packages(), + install_requires=[ + 'docker-py', + ], + tests_require=[ + 'mock', + ], + test_suite='tests', + zip_safe=False, + cmdclass={'egg_info': tagger}, + ) diff --git a/services/dockercleaner/tests/__init__.py b/services/dockercleaner/tests/__init__.py new file mode 100644 index 0000000000..ab92cabaca --- /dev/null +++ b/services/dockercleaner/tests/__init__.py @@ -0,0 +1,4 @@ +#!/usr/bin/env python3 + +import logging +logging.getLogger('').setLevel(logging.CRITICAL) diff --git a/services/dockercleaner/tests/test_cleaner.py b/services/dockercleaner/tests/test_cleaner.py new file mode 100644 index 0000000000..818cc9d055 --- /dev/null +++ b/services/dockercleaner/tests/test_cleaner.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 + +import collections +import itertools +import json +import random +import time +import unittest + +import docker +import mock + +from arvados_docker import cleaner + +MAX_DOCKER_ID = (16 ** 64) - 1 + +def MockDockerId(): + return '{:064x}'.format(random.randint(0, MAX_DOCKER_ID)) + +def MockContainer(image_hash): + return {'Id': MockDockerId(), + 'Image': image_hash['Id']} + +def MockImage(*, size=0, vsize=None, tags=[]): + if vsize is None: + vsize = random.randint(100, 2000000) + return {'Id': MockDockerId(), + 'ParentId': MockDockerId(), + 'RepoTags': list(tags), + 'Size': size, + 'VirtualSize': vsize} + +class MockEvent(dict): + ENCODING = 'utf-8' + event_seq = itertools.count(1) + + def __init__(self, status, docker_id=None, **event_data): + if docker_id is None: + docker_id = MockDockerId() + super().__init__(self, **event_data) + self['status'] = status + self['id'] = docker_id + self.setdefault('time', next(self.event_seq)) + + def encoded(self): + return json.dumps(self).encode(self.ENCODING) + + +class MockException(docker.errors.APIError): + def __init__(self, status_code): + response = mock.Mock(name='response') + response.status_code = status_code + super().__init__("mock exception", response) + + +class DockerImageTestCase(unittest.TestCase): + def test_used_at_sets_last_used(self): + image = cleaner.DockerImage(MockImage()) + image.used_at(5) + self.assertEqual(5, image.last_used) + + def test_used_at_moves_forward(self): + image = cleaner.DockerImage(MockImage()) + image.used_at(6) + image.used_at(8) + self.assertEqual(8, image.last_used) + + def test_used_at_does_not_go_backward(self): + image = cleaner.DockerImage(MockImage()) + image.used_at(4) + image.used_at(2) + self.assertEqual(4, image.last_used) + + +class DockerImagesTestCase(unittest.TestCase): + def setUp(self): + self.mock_images = [] + + def setup_mock_images(self, *vsizes): + self.mock_images.extend(MockImage(vsize=vsize) for vsize in vsizes) + + def setup_images(self, *vsizes, target_size=1000000): + self.setup_mock_images(*vsizes) + images = cleaner.DockerImages(target_size) + for image in self.mock_images: + images.add_image(image) + return images + + def test_has_image(self): + images = self.setup_images(None) + self.assertTrue(images.has_image(self.mock_images[0]['Id'])) + self.assertFalse(images.has_image(MockDockerId())) + + def test_del_image(self): + images = self.setup_images(None) + images.del_image(self.mock_images[0]['Id']) + self.assertFalse(images.has_image(self.mock_images[0]['Id'])) + + def test_del_nonexistent_image(self): + images = self.setup_images(None) + images.del_image(MockDockerId()) + self.assertTrue(images.has_image(self.mock_images[0]['Id'])) + + def test_no_users_at_start(self): + images = self.setup_images(None) + self.assertFalse(images.any_users()) + + def test_users_recorded(self): + images = self.setup_images(None) + images.add_user(MockContainer(self.mock_images[-1]), 1) + self.assertTrue(images.any_users()) + + def test_users_unrecorded(self): + images = self.setup_images(None) + user = MockContainer(self.mock_images[-1]) + images.add_user(user, 1) + images.end_user(user['Id']) + self.assertFalse(images.any_users()) + + def test_users_can_restart(self): + images = self.setup_images(None) + user = MockContainer(self.mock_images[-1]) + images.add_user(user, 1) + images.end_user(user['Id']) + images.add_user(user, 2) + self.assertTrue(images.any_users()) + + def test_multiple_users(self): + images = self.setup_images(None, None) + users = [MockContainer(image) for image in self.mock_images] + images.add_user(users[0], 1) + images.add_user(users[1], 2) + images.end_user(users[0]['Id']) + self.assertTrue(images.any_users()) + images.end_user(users[1]['Id']) + self.assertFalse(images.any_users()) + + def test_one_image_multiple_users(self): + images = self.setup_images(None) + users = [MockContainer(self.mock_images[0]) for ii in range(2)] + images.add_user(users[0], 1) + images.add_user(users[1], 2) + images.end_user(users[0]['Id']) + self.assertTrue(images.any_users()) + images.end_user(users[1]['Id']) + self.assertFalse(images.any_users()) + + def test_nonexistent_user_added(self): + images = self.setup_images() + images.add_user(MockContainer(MockImage()), 1) + self.assertFalse(images.any_users()) + + def test_nonexistent_user_removed(self): + images = self.setup_images() + images.end_user('nonexistent') + self.assertFalse(images.any_users()) + + def test_nonexistent_user_removed_amongst_real_users(self): + images = self.setup_images(None) + user = MockContainer(self.mock_images[-1]) + images.add_user(user, 1) + images.add_user(MockContainer(MockImage()), 2) + self.assertTrue(images.any_users()) + images.end_user(user['Id']) + self.assertFalse(images.any_users()) + + def test_del_image_removes_users(self): + images = self.setup_images(None) + user = MockContainer(self.mock_images[0]) + images.add_user(user, 1) + images.del_image(self.mock_images[0]['Id']) + self.assertFalse(images.any_users()) + + def test_images_under_target_not_deletable(self): + images = self.setup_images(200, 100, 300, target_size=150) + self.assertEqual({self.mock_images[ii]['Id'] for ii in (0, 2)}, + set(images.should_delete())) + + def test_all_images_deletable(self): + images = self.setup_images(None, None, target_size=1) + self.assertEqual({image['Id'] for image in self.mock_images}, + set(images.should_delete())) + + def test_images_in_use_not_deletable(self): + images = self.setup_images(None, None, target_size=1) + users = [MockContainer(image) for image in self.mock_images] + images.add_user(users[0], 1) + images.add_user(users[1], 2) + images.end_user(users[1]['Id']) + self.assertEqual([self.mock_images[1]['Id']], + list(images.should_delete())) + + def test_images_suggested_for_deletion_by_lru(self): + images = self.setup_images(10, 10, 10, target_size=1) + users = [MockContainer(image) for image in self.mock_images] + images.add_user(users[0], 3) + images.add_user(users[1], 1) + images.add_user(users[2], 2) + for user in users: + images.end_user(user['Id']) + self.assertEqual([self.mock_images[ii]['Id'] for ii in (1, 2, 0)], + list(images.should_delete())) + + def setup_from_daemon(self, *vsizes, target_size=1500000): + self.setup_mock_images(*vsizes) + docker_client = mock.MagicMock(name='docker_client') + docker_client.images.return_value = iter(self.mock_images) + return cleaner.DockerImages.from_daemon(target_size, docker_client) + + def test_images_loaded_from_daemon(self): + images = self.setup_from_daemon(None, None) + for image in self.mock_images: + self.assertTrue(images.has_image(image['Id'])) + + def test_target_size_set_from_daemon(self): + images = self.setup_from_daemon(20, 10, target_size=15) + self.assertEqual([self.mock_images[0]['Id']], + list(images.should_delete())) + + +class DockerImageUseRecorderTestCase(unittest.TestCase): + TEST_CLASS = cleaner.DockerImageUseRecorder + + def setUp(self): + self.images = mock.MagicMock(name='images') + self.docker_client = mock.MagicMock(name='docker_client') + self.events = [] + self.recorder = self.TEST_CLASS(self.images, self.docker_client, + self.encoded_events) + + @property + def encoded_events(self): + return (event.encoded() for event in self.events) + + def test_unknown_events_ignored(self): + self.events.append(MockEvent('mock!event')) + self.recorder.run() + # No exception should be raised. + + def test_fetches_container_on_create(self): + self.events.append(MockEvent('create')) + self.recorder.run() + self.docker_client.inspect_container.assert_called_with( + self.events[0]['id']) + + def test_adds_user_on_container_create(self): + self.events.append(MockEvent('create')) + self.recorder.run() + self.images.add_user.assert_called_with( + self.docker_client.inspect_container(), self.events[0]['time']) + + def test_unknown_image_handling(self): + # The use recorder should not fetch any images. + self.events.append(MockEvent('create')) + self.recorder.run() + self.assertFalse(self.docker_client.inspect_image.called) + + def test_unfetchable_containers_ignored(self): + self.events.append(MockEvent('create')) + self.docker_client.inspect_container.side_effect = MockException(404) + self.recorder.run() + self.assertFalse(self.images.add_user.called) + + def test_ends_user_on_container_destroy(self): + self.events.append(MockEvent('destroy')) + self.recorder.run() + self.images.end_user.assert_called_with(self.events[0]['id']) + + +class DockerImageCleanerTestCase(DockerImageUseRecorderTestCase): + TEST_CLASS = cleaner.DockerImageCleaner + + def test_unknown_image_handling(self): + # The image cleaner should fetch and record new images. + self.images.has_image.return_value = False + self.events.append(MockEvent('create')) + self.recorder.run() + self.docker_client.inspect_image.assert_called_with( + self.docker_client.inspect_container()['Image']) + self.images.add_image.assert_called_with( + self.docker_client.inspect_image()) + + def test_unfetchable_images_ignored(self): + self.images.has_image.return_value = False + self.docker_client.inspect_image.side_effect = MockException(404) + self.events.append(MockEvent('create')) + self.recorder.run() + self.docker_client.inspect_image.assert_called_with( + self.docker_client.inspect_container()['Image']) + self.assertFalse(self.images.add_image.called) + + def test_no_deletions_when_containers_running(self): + self.images.any_users.return_value = True + self.events.append(MockEvent('destroy')) + self.recorder.run() + self.assertFalse(self.images.should_delete.called) + self.assertFalse(self.docker_client.remove_image.called) + + def test_deletions_after_destroy(self): + delete_id = MockDockerId() + self.images.any_users.return_value = False + self.images.should_delete.return_value = [delete_id] + self.events.append(MockEvent('destroy')) + self.recorder.run() + self.docker_client.remove_image.assert_called_with(delete_id) + self.images.del_image.assert_called_with(delete_id) + + def test_failed_deletion_handling(self): + delete_id = MockDockerId() + self.images.any_users.return_value = False + self.images.should_delete.return_value = [delete_id] + self.docker_client.remove_image.side_effect = MockException(500) + self.events.append(MockEvent('destroy')) + self.recorder.run() + self.docker_client.remove_image.assert_called_with(delete_id) + self.assertFalse(self.images.del_image.called) + + +class HumanSizeTestCase(unittest.TestCase): + def check(self, human_str, count, exp): + self.assertEqual(count * (1024 ** exp), + cleaner.human_size(human_str)) + + def test_bytes(self): + self.check('1', 1, 0) + self.check('82', 82, 0) + + def test_kibibytes(self): + self.check('2K', 2, 1) + self.check('3k', 3, 1) + + def test_mebibytes(self): + self.check('4M', 4, 2) + self.check('5m', 5, 2) + + def test_gibibytes(self): + self.check('6G', 6, 3) + self.check('7g', 7, 3) + + def test_tebibytes(self): + self.check('8T', 8, 4) + self.check('9t', 9, 4) + + +class RunTestCase(unittest.TestCase): + def setUp(self): + self.args = mock.MagicMock(name='args') + self.args.keep = 1000000 + self.docker_client = mock.MagicMock(name='docker_client') + + def test_run(self): + test_start_time = int(time.time()) + self.docker_client.events.return_value = [] + cleaner.run(self.args, self.docker_client) + self.assertEqual(2, self.docker_client.events.call_count) + event_kwargs = [args[1] for args in + self.docker_client.events.call_args_list] + self.assertIn('since', event_kwargs[0]) + self.assertIn('until', event_kwargs[0]) + self.assertLessEqual(test_start_time, event_kwargs[0]['until']) + self.assertIn('since', event_kwargs[1]) + self.assertEqual(event_kwargs[0]['until'], event_kwargs[1]['since']) -- 2.39.5