Merge branch '12479-wb-structured-vocabulary'
[arvados.git] / services / dockercleaner / arvados_docker / cleaner.py
1 #!/usr/bin/env python3
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: AGPL-3.0
5
6 """arvados_docker.cleaner - Remove unused Docker images from compute nodes
7
8 Usage:
9   python3 -m arvados_docker.cleaner --quota 50G
10 """
11
12 import argparse
13 import collections
14 import copy
15 import functools
16 import json
17 import logging
18 import sys
19 import time
20
21 import docker
22 import json
23
24 DEFAULT_CONFIG_FILE = '/etc/arvados/docker-cleaner/docker-cleaner.json'
25
26 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
27
28 logger = logging.getLogger('arvados_docker.cleaner')
29
30
31 def return_when_docker_not_found(result=None):
32     # If the decorated function raises a 404 error from Docker, return
33     # `result` instead.
34     def docker_not_found_decorator(orig_func):
35         @functools.wraps(orig_func)
36         def docker_not_found_wrapper(*args, **kwargs):
37             try:
38                 return orig_func(*args, **kwargs)
39             except docker.errors.APIError as error:
40                 if error.response.status_code != 404:
41                     raise
42                 return result
43         return docker_not_found_wrapper
44     return docker_not_found_decorator
45
46
47 class DockerImage:
48
49     def __init__(self, image_hash):
50         self.docker_id = image_hash['Id']
51         self.size = image_hash['VirtualSize']
52         self.last_used = -1
53
54     def used_at(self, use_time):
55         self.last_used = max(self.last_used, use_time)
56
57
58 class DockerImages:
59
60     def __init__(self, target_size):
61         self.target_size = target_size
62         self.images = {}
63         self.container_image_map = {}
64
65     @classmethod
66     def from_daemon(cls, target_size, docker_client):
67         images = cls(target_size)
68         for image in docker_client.images():
69             images.add_image(image)
70         return images
71
72     def add_image(self, image_hash):
73         image = DockerImage(image_hash)
74         self.images[image.docker_id] = image
75         logger.debug("Registered image %s", image.docker_id)
76
77     def del_image(self, image_id):
78         if image_id in self.images:
79             del self.images[image_id]
80             self.container_image_map = {
81                 cid: cid_image
82                 for cid, cid_image in self.container_image_map.items()
83                 if cid_image != image_id}
84             logger.debug("Unregistered image %s", image_id)
85
86     def has_image(self, image_id):
87         return image_id in self.images
88
89     def add_user(self, container_hash, use_time):
90         image_id = container_hash['Image']
91         if image_id in self.images:
92             self.container_image_map[container_hash['Id']] = image_id
93             self.images[image_id].used_at(use_time)
94             logger.debug("Registered container %s using image %s",
95                          container_hash['Id'], image_id)
96
97     def end_user(self, cid):
98         self.container_image_map.pop(cid, None)
99         logger.debug("Unregistered container %s", cid)
100
101     def should_delete(self):
102         if not self.images:
103             return
104         # Build a list of images, ordered by use time.
105         lru_images = list(self.images.values())
106         lru_images.sort(key=lambda image: image.last_used)
107         # Make sure we don't delete any images in use, or if there are
108         # none, the most recently used image.
109         if self.container_image_map:
110             keep_ids = set(self.container_image_map.values())
111         else:
112             keep_ids = {lru_images[-1].docker_id}
113         space_left = (self.target_size - sum(self.images[image_id].size
114                                              for image_id in keep_ids))
115         # Go through the list most recently used first, and note which
116         # images can be saved with the space allotted.
117         for image in reversed(lru_images):
118             if (image.docker_id not in keep_ids) and (image.size <= space_left):
119                 keep_ids.add(image.docker_id)
120                 space_left -= image.size
121         # Yield the Docker IDs of any image we don't want to save, least
122         # recently used first.
123         for image in lru_images:
124             if image.docker_id not in keep_ids:
125                 yield image.docker_id
126
127
128 class DockerEventHandlers:
129     # This class maps Docker event types to the names of methods that should
130     # receive those events.
131
132     def __init__(self):
133         self.handler_names = collections.defaultdict(list)
134
135     def on(self, *status_names):
136         def register_handler(handler_method):
137             for status in status_names:
138                 self.handler_names[status].append(handler_method.__name__)
139             return handler_method
140         return register_handler
141
142     def for_event(self, status):
143         return iter(self.handler_names[status])
144
145     def copy(self):
146         result = self.__class__()
147         result.handler_names = copy.deepcopy(self.handler_names)
148         return result
149
150
151 class DockerEventListener:
152     # To use this class, define event_handlers as an instance of
153     # DockerEventHandlers.  Call run() to iterate over events and call the
154     # handler methods as they come in.
155     ENCODING = 'utf-8'
156
157     def __init__(self, events):
158         self.events = events
159
160     def run(self):
161         for event in self.events:
162             event = json.loads(event.decode(self.ENCODING))
163             if event.get('Type', 'container') != 'container':
164                 continue
165             for method_name in self.event_handlers.for_event(event.get('status')):
166                 getattr(self, method_name)(event)
167
168
169 class DockerImageUseRecorder(DockerEventListener):
170     event_handlers = DockerEventHandlers()
171
172     def __init__(self, images, docker_client, events):
173         self.images = images
174         self.docker_client = docker_client
175         super().__init__(events)
176
177     @event_handlers.on('create')
178     @return_when_docker_not_found()
179     def load_container(self, event):
180         container_hash = self.docker_client.inspect_container(event['id'])
181         self.new_container(event, container_hash)
182
183     def new_container(self, event, container_hash):
184         self.images.add_user(container_hash, event['time'])
185
186     @event_handlers.on('destroy')
187     def container_stopped(self, event):
188         self.images.end_user(event['id'])
189
190
191 class DockerImageCleaner(DockerImageUseRecorder):
192     event_handlers = DockerImageUseRecorder.event_handlers.copy()
193
194     def __init__(self, images, docker_client, events, remove_containers_onexit=False):
195         super().__init__(images, docker_client, events)
196         self.logged_unknown = set()
197         self.remove_containers_onexit = remove_containers_onexit
198
199     def new_container(self, event, container_hash):
200         container_image_id = container_hash['Image']
201         if not self.images.has_image(container_image_id):
202             image_hash = self.docker_client.inspect_image(container_image_id)
203             self.images.add_image(image_hash)
204         return super().new_container(event, container_hash)
205
206     def _remove_container(self, cid):
207         try:
208             self.docker_client.remove_container(cid, v=True)
209         except docker.errors.APIError as error:
210             logger.warning("Failed to remove container %s: %s", cid, error)
211         else:
212             logger.info("Removed container %s", cid)
213
214     @event_handlers.on('die')
215     def clean_container(self, event=None):
216         if self.remove_containers_onexit:
217             self._remove_container(event['id'])
218
219     def check_stopped_containers(self, remove=False):
220         logger.info("Checking for stopped containers")
221         for c in self.docker_client.containers(filters={'status': 'exited'}):
222             logger.info("Container %s %s", c['Id'], c['Status'])
223             if c['Status'][:6] != 'Exited':
224                 logger.error("Unexpected status %s for container %s",
225                              c['Status'], c['Id'])
226             elif remove:
227                 self._remove_container(c['Id'])
228
229     @event_handlers.on('destroy')
230     def clean_images(self, event=None):
231         for image_id in self.images.should_delete():
232             try:
233                 self.docker_client.remove_image(image_id)
234             except docker.errors.APIError as error:
235                 logger.warning(
236                     "Failed to remove image %s: %s", image_id, error)
237             else:
238                 logger.info("Removed image %s", image_id)
239                 self.images.del_image(image_id)
240
241     @event_handlers.on('destroy')
242     def log_unknown_images(self, event):
243         unknown_ids = {image['Id'] for image in self.docker_client.images()
244                        if not self.images.has_image(image['Id'])}
245         for image_id in (unknown_ids - self.logged_unknown):
246             logger.info(
247                 "Image %s is loaded but unused, so it won't be cleaned",
248                 image_id)
249         self.logged_unknown = unknown_ids
250
251
252 def human_size(size_str):
253     size_str = size_str.lower().rstrip('b')
254     multiplier = SUFFIX_SIZES.get(size_str[-1])
255     if multiplier is None:
256         multiplier = 1
257     else:
258         size_str = size_str[:-1]
259     return int(size_str) * multiplier
260
261
262 def load_config(arguments):
263     args = parse_arguments(arguments)
264
265     config = default_config()
266     try:
267         with open(args.config, 'r') as f:
268             c = json.load(f)
269             config.update(c)
270     except (FileNotFoundError, IOError, ValueError) as error:
271         if (isinstance(error, FileNotFoundError) and
272             args.config == DEFAULT_CONFIG_FILE):
273             logger.warning("DEPRECATED: default config file %s not found; "
274                            "relying on command line configuration",
275                            repr(DEFAULT_CONFIG_FILE))
276         else:
277             sys.exit('error reading config file {}: {}'.format(
278                 args.config, error))
279
280     configargs = vars(args).copy()
281     configargs.pop('config')
282     config.update({k: v for k, v in configargs.items() if v})
283
284     if isinstance(config['Quota'], str):
285         config['Quota'] = human_size(config['Quota'])
286
287     return config
288
289
290 def default_config():
291     return {
292         'Quota': '1G',
293         'RemoveStoppedContainers': 'always',
294         'Verbose': 0,
295     }
296
297
298 def parse_arguments(arguments):
299     class Formatter(argparse.ArgumentDefaultsHelpFormatter,
300                     argparse.RawDescriptionHelpFormatter):
301         pass
302     parser = argparse.ArgumentParser(
303         prog="arvados_docker.cleaner",
304         description="clean old Docker images from Arvados compute nodes",
305         epilog="Example config file:\n\n{}".format(
306             json.dumps(default_config(), indent=4)),
307         formatter_class=Formatter,
308     )
309     parser.add_argument(
310         '--config', action='store', type=str, default=DEFAULT_CONFIG_FILE,
311         help="configuration file")
312
313     deprecated = " (DEPRECATED -- use config file instead)"
314     parser.add_argument(
315         '--quota', action='store', type=human_size, dest='Quota',
316         help="space allowance for Docker images, suffixed with K/M/G/T" + deprecated)
317     parser.add_argument(
318         '--remove-stopped-containers', type=str, default='always', dest='RemoveStoppedContainers',
319         choices=['never', 'onexit', 'always'],
320         help="""when to remove stopped containers (default: always, i.e., remove
321         stopped containers found at startup, and remove containers as
322         soon as they exit)""" + deprecated)
323     parser.add_argument(
324         '--verbose', '-v', action='count', default=0, dest='Verbose',
325         help="log more information" + deprecated)
326
327     return parser.parse_args(arguments)
328
329
330 def setup_logging():
331     log_handler = logging.StreamHandler()
332     log_handler.setFormatter(logging.Formatter(
333         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
334         '%Y-%m-%d %H:%M:%S'))
335     logger.addHandler(log_handler)
336
337
338 def configure_logging(config):
339     logger.setLevel(logging.ERROR - (10 * config['Verbose']))
340
341
342 def run(config, docker_client):
343     start_time = int(time.time())
344     logger.debug("Loading Docker activity through present")
345     images = DockerImages.from_daemon(config['Quota'], docker_client)
346     use_recorder = DockerImageUseRecorder(
347         images, docker_client, docker_client.events(since=1, until=start_time))
348     use_recorder.run()
349     cleaner = DockerImageCleaner(
350         images, docker_client, docker_client.events(since=start_time),
351         remove_containers_onexit=config['RemoveStoppedContainers'] != 'never')
352     cleaner.check_stopped_containers(
353         remove=config['RemoveStoppedContainers'] == 'always')
354     logger.info("Checking image quota at startup")
355     cleaner.clean_images()
356     logger.info("Listening for docker events")
357     cleaner.run()
358
359
360 def main(arguments=sys.argv[1:]):
361     setup_logging()
362     config = load_config(arguments)
363     configure_logging(config)
364     try:
365         run(config, docker.Client(version='1.14'))
366     except KeyboardInterrupt:
367         sys.exit(1)
368
369 if __name__ == '__main__':
370     main()