5531bf5ec42f7e95d26ed3da394f095b7346f76f
[arvados.git] / services / dockercleaner / arvados_docker / cleaner.py
1 #!/usr/bin/env python3
2 """arvados_docker.cleaner - Remove unused Docker images from compute nodes
3
4 Usage:
5   python3 -m arvados_docker.cleaner --quota 50G
6 """
7
8 import argparse
9 import collections
10 import copy
11 import functools
12 import json
13 import logging
14 import sys
15 import time
16
17 import docker
18 import json
19
20 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
21
22 logger = logging.getLogger('arvados_docker.cleaner')
23
24 def return_when_docker_not_found(result=None):
25     # If the decorated function raises a 404 error from Docker, return
26     # `result` instead.
27     def docker_not_found_decorator(orig_func):
28         @functools.wraps(orig_func)
29         def docker_not_found_wrapper(*args, **kwargs):
30             try:
31                 return orig_func(*args, **kwargs)
32             except docker.errors.APIError as error:
33                 if error.response.status_code != 404:
34                     raise
35                 return result
36         return docker_not_found_wrapper
37     return docker_not_found_decorator
38
39 class DockerImage:
40     def __init__(self, image_hash):
41         self.docker_id = image_hash['Id']
42         self.size = image_hash['VirtualSize']
43         self.last_used = -1
44
45     def used_at(self, use_time):
46         self.last_used = max(self.last_used, use_time)
47
48
49 class DockerImages:
50     def __init__(self, target_size):
51         self.target_size = target_size
52         self.images = {}
53         self.container_image_map = {}
54
55     @classmethod
56     def from_daemon(cls, target_size, docker_client):
57         images = cls(target_size)
58         for image in docker_client.images():
59             images.add_image(image)
60         return images
61
62     def add_image(self, image_hash):
63         image = DockerImage(image_hash)
64         self.images[image.docker_id] = image
65         logger.debug("Registered image %s", image.docker_id)
66
67     def del_image(self, image_id):
68         if image_id in self.images:
69             del self.images[image_id]
70             self.container_image_map = {
71                 cid: cid_image
72                 for cid, cid_image in self.container_image_map.items()
73                 if cid_image != image_id}
74             logger.debug("Unregistered image %s", image_id)
75
76     def has_image(self, image_id):
77         return image_id in self.images
78
79     def add_user(self, container_hash, use_time):
80         image_id = container_hash['Image']
81         if image_id in self.images:
82             self.container_image_map[container_hash['Id']] = image_id
83             self.images[image_id].used_at(use_time)
84             logger.debug("Registered container %s using image %s",
85                          container_hash['Id'], image_id)
86
87     def end_user(self, cid):
88         self.container_image_map.pop(cid, None)
89         logger.debug("Unregistered container %s", cid)
90
91     def should_delete(self):
92         if not self.images:
93             return
94         # Build a list of images, ordered by use time.
95         lru_images = list(self.images.values())
96         lru_images.sort(key=lambda image: image.last_used)
97         # Make sure we don't delete any images in use, or if there are
98         # none, the most recently used image.
99         if self.container_image_map:
100             keep_ids = set(self.container_image_map.values())
101         else:
102             keep_ids = {lru_images[-1].docker_id}
103         space_left = (self.target_size - sum(self.images[image_id].size
104                                              for image_id in keep_ids))
105         # Go through the list most recently used first, and note which
106         # images can be saved with the space allotted.
107         for image in reversed(lru_images):
108             if (image.docker_id not in keep_ids) and (image.size <= space_left):
109                 keep_ids.add(image.docker_id)
110                 space_left -= image.size
111         # Yield the Docker IDs of any image we don't want to save, least
112         # recently used first.
113         for image in lru_images:
114             if image.docker_id not in keep_ids:
115                 yield image.docker_id
116
117
118 class DockerEventHandlers:
119     # This class maps Docker event types to the names of methods that should
120     # receive those events.
121     def __init__(self):
122         self.handler_names = collections.defaultdict(list)
123
124     def on(self, *status_names):
125         def register_handler(handler_method):
126             for status in status_names:
127                 self.handler_names[status].append(handler_method.__name__)
128             return handler_method
129         return register_handler
130
131     def for_event(self, status):
132         return iter(self.handler_names[status])
133
134     def copy(self):
135         result = self.__class__()
136         result.handler_names = copy.deepcopy(self.handler_names)
137         return result
138
139
140 class DockerEventListener:
141     # To use this class, define event_handlers as an instance of
142     # DockerEventHandlers.  Call run() to iterate over events and call the
143     # handler methods as they come in.
144     ENCODING = 'utf-8'
145
146     def __init__(self, events):
147         self.events = events
148
149     def run(self):
150         for event in self.events:
151             event = json.loads(event.decode(self.ENCODING))
152             if event.get('Type', 'container') != 'container':
153                 continue
154             for method_name in self.event_handlers.for_event(event.get('status')):
155                 getattr(self, method_name)(event)
156
157
158 class DockerImageUseRecorder(DockerEventListener):
159     event_handlers = DockerEventHandlers()
160
161     def __init__(self, images, docker_client, events):
162         self.images = images
163         self.docker_client = docker_client
164         super().__init__(events)
165
166     @event_handlers.on('create')
167     @return_when_docker_not_found()
168     def load_container(self, event):
169         container_hash = self.docker_client.inspect_container(event['id'])
170         self.new_container(event, container_hash)
171
172     def new_container(self, event, container_hash):
173         self.images.add_user(container_hash, event['time'])
174
175     @event_handlers.on('destroy')
176     def container_stopped(self, event):
177         self.images.end_user(event['id'])
178
179
180 class DockerImageCleaner(DockerImageUseRecorder):
181     event_handlers = DockerImageUseRecorder.event_handlers.copy()
182
183     def __init__(self, images, docker_client, events, remove_containers_onexit=False):
184         super().__init__(images, docker_client, events)
185         self.logged_unknown = set()
186         self.remove_containers_onexit = remove_containers_onexit
187
188     def new_container(self, event, container_hash):
189         container_image_id = container_hash['Image']
190         if not self.images.has_image(container_image_id):
191             image_hash = self.docker_client.inspect_image(container_image_id)
192             self.images.add_image(image_hash)
193         return super().new_container(event, container_hash)
194
195     def _remove_container(self, cid):
196         try:
197             self.docker_client.remove_container(cid, v=True)
198         except docker.errors.APIError as error:
199             logger.warning("Failed to remove container %s: %s", cid, error)
200         else:
201             logger.info("Removed container %s", cid)
202
203     @event_handlers.on('die')
204     def clean_container(self, event=None):
205         if self.remove_containers_onexit:
206             self._remove_container(event['id'])
207
208     def check_stopped_containers(self, remove=False):
209         logger.info("Checking for stopped containers")
210         for c in self.docker_client.containers(filters={'status': 'exited'}):
211             logger.info("Container %s %s", c['Id'], c['Status'])
212             if c['Status'][:6] != 'Exited':
213                 logger.error("Unexpected status %s for container %s",
214                              c['Status'], c['Id'])
215             elif remove:
216                 self._remove_container(c['Id'])
217
218     @event_handlers.on('destroy')
219     def clean_images(self, event=None):
220         for image_id in self.images.should_delete():
221             try:
222                 self.docker_client.remove_image(image_id)
223             except docker.errors.APIError as error:
224                 logger.warning("Failed to remove image %s: %s", image_id, error)
225             else:
226                 logger.info("Removed image %s", image_id)
227                 self.images.del_image(image_id)
228
229     @event_handlers.on('destroy')
230     def log_unknown_images(self, event):
231         unknown_ids = {image['Id'] for image in self.docker_client.images()
232                        if not self.images.has_image(image['Id'])}
233         for image_id in (unknown_ids - self.logged_unknown):
234             logger.info("Image %s is loaded but unused, so it won't be cleaned",
235                         image_id)
236         self.logged_unknown = unknown_ids
237
238
239 def human_size(size_str):
240     size_str = size_str.lower().rstrip('b')
241     multiplier = SUFFIX_SIZES.get(size_str[-1])
242     if multiplier is None:
243         multiplier = 1
244     else:
245         size_str = size_str[:-1]
246     return int(size_str) * multiplier
247
248 def load_config(arguments):
249     args = parse_arguments(arguments)
250
251     config = default_config()
252     with open(args.config, 'r') as f:
253         config.update(json.load(f))
254
255     configargs = vars(args).copy()
256     configargs.pop('config')
257     config.update({k: v for k, v in configargs.items() if v})
258
259     if isinstance(config['Quota'], str):
260         config['Quota'] = human_size(config['Quota'])
261
262     return config
263
264 def default_config():
265     return {
266         'Quota': '1G',
267         'RemoveStoppedContainers': 'always',
268         'Verbose': 0,
269     }
270
271 def parse_arguments(arguments):
272     class Formatter(argparse.ArgumentDefaultsHelpFormatter,
273                     argparse.RawDescriptionHelpFormatter):
274         pass
275     parser = argparse.ArgumentParser(
276         prog="arvados_docker.cleaner",
277         description="clean old Docker images from Arvados compute nodes",
278         epilog="Example config file:\n\n{}".format(
279             json.dumps(default_config(), indent=4)),
280         formatter_class=Formatter,
281     )
282     parser.add_argument(
283         '--config', action='store', type=str, default='/etc/arvados/dockercleaner/config.json',
284         help="configuration file")
285
286     deprecated = " (DEPRECATED -- use config file instead)"
287     parser.add_argument(
288         '--quota', action='store', type=human_size, dest='Quota',
289         help="space allowance for Docker images, suffixed with K/M/G/T" + deprecated)
290     parser.add_argument(
291         '--remove-stopped-containers', type=str, default='always', dest='RemoveStoppedContainers',
292         choices=['never', 'onexit', 'always'],
293         help="""when to remove stopped containers (default: always, i.e., remove
294         stopped containers found at startup, and remove containers as
295         soon as they exit)""" + deprecated)
296     parser.add_argument(
297         '--verbose', '-v', action='count', default=0, dest='Verbose',
298         help="log more information" + deprecated)
299
300     return parser.parse_args(arguments)
301
302 def setup_logging(config):
303     log_handler = logging.StreamHandler()
304     log_handler.setFormatter(logging.Formatter(
305             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
306             '%Y-%m-%d %H:%M:%S'))
307     logger.addHandler(log_handler)
308     logger.setLevel(logging.ERROR - (10 * config['Verbose']))
309
310 def run(config, docker_client):
311     start_time = int(time.time())
312     logger.debug("Loading Docker activity through present")
313     images = DockerImages.from_daemon(config['Quota'], docker_client)
314     use_recorder = DockerImageUseRecorder(
315         images, docker_client, docker_client.events(since=1, until=start_time))
316     use_recorder.run()
317     cleaner = DockerImageCleaner(
318         images, docker_client, docker_client.events(since=start_time),
319         remove_containers_onexit=config['RemoveStoppedContainers'] != 'never')
320     cleaner.check_stopped_containers(
321         remove=config['RemoveStoppedContainers'] == 'always')
322     logger.info("Checking image quota at startup")
323     cleaner.clean_images()
324     logger.info("Listening for docker events")
325     cleaner.run()
326
327 def main(arguments=sys.argv[1:]):
328     config = load_config(arguments)
329     setup_logging(config)
330     try:
331         run(config, docker.Client(version='1.14'))
332     except KeyboardInterrupt:
333         sys.exit(1)
334
335 if __name__ == '__main__':
336     main()