2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: AGPL-3.0
6 """arvados_docker.cleaner - Remove unused Docker images from compute nodes
9 python3 -m arvados_docker.cleaner --quota 50G
24 DEFAULT_CONFIG_FILE = '/etc/arvados/docker-cleaner/docker-cleaner.json'
26 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
28 logger = logging.getLogger('arvados_docker.cleaner')
31 def return_when_docker_not_found(result=None):
32 # If the decorated function raises a 404 error from Docker, return
34 def docker_not_found_decorator(orig_func):
35 @functools.wraps(orig_func)
36 def docker_not_found_wrapper(*args, **kwargs):
38 return orig_func(*args, **kwargs)
39 except docker.errors.APIError as error:
40 if error.response.status_code != 404:
43 return docker_not_found_wrapper
44 return docker_not_found_decorator
49 def __init__(self, image_hash):
50 self.docker_id = image_hash['Id']
51 self.size = image_hash['VirtualSize']
54 def used_at(self, use_time):
55 self.last_used = max(self.last_used, use_time)
60 def __init__(self, target_size):
61 self.target_size = target_size
63 self.container_image_map = {}
66 def from_daemon(cls, target_size, docker_client):
67 images = cls(target_size)
68 for image in docker_client.images():
69 images.add_image(image)
72 def add_image(self, image_hash):
73 image = DockerImage(image_hash)
74 self.images[image.docker_id] = image
75 logger.debug("Registered image %s", image.docker_id)
77 def del_image(self, image_id):
78 if image_id in self.images:
79 del self.images[image_id]
80 self.container_image_map = {
82 for cid, cid_image in self.container_image_map.items()
83 if cid_image != image_id}
84 logger.debug("Unregistered image %s", image_id)
86 def has_image(self, image_id):
87 return image_id in self.images
89 def add_user(self, container_hash, use_time):
90 image_id = container_hash['Image']
91 if image_id in self.images:
92 self.container_image_map[container_hash['Id']] = image_id
93 self.images[image_id].used_at(use_time)
94 logger.debug("Registered container %s using image %s",
95 container_hash['Id'], image_id)
97 def end_user(self, cid):
98 self.container_image_map.pop(cid, None)
99 logger.debug("Unregistered container %s", cid)
101 def should_delete(self):
104 # Build a list of images, ordered by use time.
105 lru_images = list(self.images.values())
106 lru_images.sort(key=lambda image: image.last_used)
107 # Make sure we don't delete any images in use, or if there are
108 # none, the most recently used image.
109 if self.container_image_map:
110 keep_ids = set(self.container_image_map.values())
112 keep_ids = {lru_images[-1].docker_id}
113 space_left = (self.target_size - sum(self.images[image_id].size
114 for image_id in keep_ids))
115 # Go through the list most recently used first, and note which
116 # images can be saved with the space allotted.
117 for image in reversed(lru_images):
118 if (image.docker_id not in keep_ids) and (image.size <= space_left):
119 keep_ids.add(image.docker_id)
120 space_left -= image.size
121 # Yield the Docker IDs of any image we don't want to save, least
122 # recently used first.
123 for image in lru_images:
124 if image.docker_id not in keep_ids:
125 yield image.docker_id
128 class DockerEventHandlers:
129 # This class maps Docker event types to the names of methods that should
130 # receive those events.
133 self.handler_names = collections.defaultdict(list)
135 def on(self, *status_names):
136 def register_handler(handler_method):
137 for status in status_names:
138 self.handler_names[status].append(handler_method.__name__)
139 return handler_method
140 return register_handler
142 def for_event(self, status):
143 return iter(self.handler_names[status])
146 result = self.__class__()
147 result.handler_names = copy.deepcopy(self.handler_names)
151 class DockerEventListener:
152 # To use this class, define event_handlers as an instance of
153 # DockerEventHandlers. Call run() to iterate over events and call the
154 # handler methods as they come in.
157 def __init__(self, events):
161 for event in self.events:
162 event = json.loads(event.decode(self.ENCODING))
163 if event.get('Type', 'container') != 'container':
165 for method_name in self.event_handlers.for_event(event.get('status')):
166 getattr(self, method_name)(event)
169 class DockerImageUseRecorder(DockerEventListener):
170 event_handlers = DockerEventHandlers()
172 def __init__(self, images, docker_client, events):
174 self.docker_client = docker_client
175 super().__init__(events)
177 @event_handlers.on('create')
178 @return_when_docker_not_found()
179 def load_container(self, event):
180 container_hash = self.docker_client.inspect_container(event['id'])
181 self.new_container(event, container_hash)
183 def new_container(self, event, container_hash):
184 self.images.add_user(container_hash, event['time'])
186 @event_handlers.on('destroy')
187 def container_stopped(self, event):
188 self.images.end_user(event['id'])
191 class DockerImageCleaner(DockerImageUseRecorder):
192 event_handlers = DockerImageUseRecorder.event_handlers.copy()
194 def __init__(self, images, docker_client, events, remove_containers_onexit=False):
195 super().__init__(images, docker_client, events)
196 self.logged_unknown = set()
197 self.remove_containers_onexit = remove_containers_onexit
199 def new_container(self, event, container_hash):
200 container_image_id = container_hash['Image']
201 if not self.images.has_image(container_image_id):
202 image_hash = self.docker_client.inspect_image(container_image_id)
203 self.images.add_image(image_hash)
204 return super().new_container(event, container_hash)
206 def _remove_container(self, cid):
208 self.docker_client.remove_container(cid, v=True)
209 except docker.errors.APIError as error:
210 logger.warning("Failed to remove container %s: %s", cid, error)
212 logger.info("Removed container %s", cid)
214 @event_handlers.on('die')
215 def clean_container(self, event=None):
216 if self.remove_containers_onexit:
217 self._remove_container(event['id'])
219 def check_stopped_containers(self, remove=False):
220 logger.info("Checking for stopped containers")
221 for c in self.docker_client.containers(filters={'status': 'exited'}):
222 logger.info("Container %s %s", c['Id'], c['Status'])
223 if c['Status'][:6] != 'Exited':
224 logger.error("Unexpected status %s for container %s",
225 c['Status'], c['Id'])
227 self._remove_container(c['Id'])
229 @event_handlers.on('destroy')
230 def clean_images(self, event=None):
231 for image_id in self.images.should_delete():
233 self.docker_client.remove_image(image_id)
234 except docker.errors.APIError as error:
236 "Failed to remove image %s: %s", image_id, error)
238 logger.info("Removed image %s", image_id)
239 self.images.del_image(image_id)
241 @event_handlers.on('destroy')
242 def log_unknown_images(self, event):
243 unknown_ids = {image['Id'] for image in self.docker_client.images()
244 if not self.images.has_image(image['Id'])}
245 for image_id in (unknown_ids - self.logged_unknown):
247 "Image %s is loaded but unused, so it won't be cleaned",
249 self.logged_unknown = unknown_ids
252 def human_size(size_str):
253 size_str = size_str.lower().rstrip('b')
254 multiplier = SUFFIX_SIZES.get(size_str[-1])
255 if multiplier is None:
258 size_str = size_str[:-1]
259 return int(size_str) * multiplier
262 def load_config(arguments):
263 args = parse_arguments(arguments)
265 config = default_config()
267 with open(args.config, 'r') as f:
270 except (FileNotFoundError, IOError, ValueError) as error:
271 if (isinstance(error, FileNotFoundError) and
272 args.config == DEFAULT_CONFIG_FILE):
273 logger.warning("DEPRECATED: default config file %s not found; "
274 "relying on command line configuration",
275 repr(DEFAULT_CONFIG_FILE))
277 sys.exit('error reading config file {}: {}'.format(
280 configargs = vars(args).copy()
281 configargs.pop('config')
282 config.update({k: v for k, v in configargs.items() if v})
284 if isinstance(config['Quota'], str):
285 config['Quota'] = human_size(config['Quota'])
290 def default_config():
293 'RemoveStoppedContainers': 'always',
298 def parse_arguments(arguments):
299 class Formatter(argparse.ArgumentDefaultsHelpFormatter,
300 argparse.RawDescriptionHelpFormatter):
302 parser = argparse.ArgumentParser(
303 prog="arvados_docker.cleaner",
304 description="clean old Docker images from Arvados compute nodes",
305 epilog="Example config file:\n\n{}".format(
306 json.dumps(default_config(), indent=4)),
307 formatter_class=Formatter,
310 '--config', action='store', type=str, default=DEFAULT_CONFIG_FILE,
311 help="configuration file")
313 deprecated = " (DEPRECATED -- use config file instead)"
315 '--quota', action='store', type=human_size, dest='Quota',
316 help="space allowance for Docker images, suffixed with K/M/G/T" + deprecated)
318 '--remove-stopped-containers', type=str, default='always', dest='RemoveStoppedContainers',
319 choices=['never', 'onexit', 'always'],
320 help="""when to remove stopped containers (default: always, i.e., remove
321 stopped containers found at startup, and remove containers as
322 soon as they exit)""" + deprecated)
324 '--verbose', '-v', action='count', default=0, dest='Verbose',
325 help="log more information" + deprecated)
327 return parser.parse_args(arguments)
331 log_handler = logging.StreamHandler()
332 log_handler.setFormatter(logging.Formatter(
333 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
334 '%Y-%m-%d %H:%M:%S'))
335 logger.addHandler(log_handler)
338 def configure_logging(config):
339 logger.setLevel(logging.ERROR - (10 * config['Verbose']))
342 def run(config, docker_client):
343 start_time = int(time.time())
344 logger.debug("Loading Docker activity through present")
345 images = DockerImages.from_daemon(config['Quota'], docker_client)
346 use_recorder = DockerImageUseRecorder(
347 images, docker_client, docker_client.events(since=1, until=start_time))
349 cleaner = DockerImageCleaner(
350 images, docker_client, docker_client.events(since=start_time),
351 remove_containers_onexit=config['RemoveStoppedContainers'] != 'never')
352 cleaner.check_stopped_containers(
353 remove=config['RemoveStoppedContainers'] == 'always')
354 logger.info("Checking image quota at startup")
355 cleaner.clean_images()
356 logger.info("Listening for docker events")
360 def main(arguments=sys.argv[1:]):
362 config = load_config(arguments)
363 configure_logging(config)
365 run(config, docker.APIClient(version='1.35'))
366 except KeyboardInterrupt:
369 if __name__ == '__main__':