8b8c772e542adfb6decaa7be501dcafab8db91a5
[arvados.git] / services / dockercleaner / arvados_docker / cleaner.py
1 #!/usr/bin/env python3
2 """arvados_docker.cleaner - Remove unused Docker images from compute nodes
3
4 Usage:
5   python3 -m arvados_docker.cleaner --quota 50G
6 """
7
8 import argparse
9 import collections
10 import copy
11 import functools
12 import json
13 import logging
14 import sys
15 import time
16
17 import docker
18 import json
19
20 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
21
22 logger = logging.getLogger('arvados_docker.cleaner')
23
24
25 def return_when_docker_not_found(result=None):
26     # If the decorated function raises a 404 error from Docker, return
27     # `result` instead.
28     def docker_not_found_decorator(orig_func):
29         @functools.wraps(orig_func)
30         def docker_not_found_wrapper(*args, **kwargs):
31             try:
32                 return orig_func(*args, **kwargs)
33             except docker.errors.APIError as error:
34                 if error.response.status_code != 404:
35                     raise
36                 return result
37         return docker_not_found_wrapper
38     return docker_not_found_decorator
39
40
41 class DockerImage:
42
43     def __init__(self, image_hash):
44         self.docker_id = image_hash['Id']
45         self.size = image_hash['VirtualSize']
46         self.last_used = -1
47
48     def used_at(self, use_time):
49         self.last_used = max(self.last_used, use_time)
50
51
52 class DockerImages:
53
54     def __init__(self, target_size):
55         self.target_size = target_size
56         self.images = {}
57         self.container_image_map = {}
58
59     @classmethod
60     def from_daemon(cls, target_size, docker_client):
61         images = cls(target_size)
62         for image in docker_client.images():
63             images.add_image(image)
64         return images
65
66     def add_image(self, image_hash):
67         image = DockerImage(image_hash)
68         self.images[image.docker_id] = image
69         logger.debug("Registered image %s", image.docker_id)
70
71     def del_image(self, image_id):
72         if image_id in self.images:
73             del self.images[image_id]
74             self.container_image_map = {
75                 cid: cid_image
76                 for cid, cid_image in self.container_image_map.items()
77                 if cid_image != image_id}
78             logger.debug("Unregistered image %s", image_id)
79
80     def has_image(self, image_id):
81         return image_id in self.images
82
83     def add_user(self, container_hash, use_time):
84         image_id = container_hash['Image']
85         if image_id in self.images:
86             self.container_image_map[container_hash['Id']] = image_id
87             self.images[image_id].used_at(use_time)
88             logger.debug("Registered container %s using image %s",
89                          container_hash['Id'], image_id)
90
91     def end_user(self, cid):
92         self.container_image_map.pop(cid, None)
93         logger.debug("Unregistered container %s", cid)
94
95     def should_delete(self):
96         if not self.images:
97             return
98         # Build a list of images, ordered by use time.
99         lru_images = list(self.images.values())
100         lru_images.sort(key=lambda image: image.last_used)
101         # Make sure we don't delete any images in use, or if there are
102         # none, the most recently used image.
103         if self.container_image_map:
104             keep_ids = set(self.container_image_map.values())
105         else:
106             keep_ids = {lru_images[-1].docker_id}
107         space_left = (self.target_size - sum(self.images[image_id].size
108                                              for image_id in keep_ids))
109         # Go through the list most recently used first, and note which
110         # images can be saved with the space allotted.
111         for image in reversed(lru_images):
112             if (image.docker_id not in keep_ids) and (image.size <= space_left):
113                 keep_ids.add(image.docker_id)
114                 space_left -= image.size
115         # Yield the Docker IDs of any image we don't want to save, least
116         # recently used first.
117         for image in lru_images:
118             if image.docker_id not in keep_ids:
119                 yield image.docker_id
120
121
122 class DockerEventHandlers:
123     # This class maps Docker event types to the names of methods that should
124     # receive those events.
125
126     def __init__(self):
127         self.handler_names = collections.defaultdict(list)
128
129     def on(self, *status_names):
130         def register_handler(handler_method):
131             for status in status_names:
132                 self.handler_names[status].append(handler_method.__name__)
133             return handler_method
134         return register_handler
135
136     def for_event(self, status):
137         return iter(self.handler_names[status])
138
139     def copy(self):
140         result = self.__class__()
141         result.handler_names = copy.deepcopy(self.handler_names)
142         return result
143
144
145 class DockerEventListener:
146     # To use this class, define event_handlers as an instance of
147     # DockerEventHandlers.  Call run() to iterate over events and call the
148     # handler methods as they come in.
149     ENCODING = 'utf-8'
150
151     def __init__(self, events):
152         self.events = events
153
154     def run(self):
155         for event in self.events:
156             event = json.loads(event.decode(self.ENCODING))
157             if event.get('Type', 'container') != 'container':
158                 continue
159             for method_name in self.event_handlers.for_event(event.get('status')):
160                 getattr(self, method_name)(event)
161
162
163 class DockerImageUseRecorder(DockerEventListener):
164     event_handlers = DockerEventHandlers()
165
166     def __init__(self, images, docker_client, events):
167         self.images = images
168         self.docker_client = docker_client
169         super().__init__(events)
170
171     @event_handlers.on('create')
172     @return_when_docker_not_found()
173     def load_container(self, event):
174         container_hash = self.docker_client.inspect_container(event['id'])
175         self.new_container(event, container_hash)
176
177     def new_container(self, event, container_hash):
178         self.images.add_user(container_hash, event['time'])
179
180     @event_handlers.on('destroy')
181     def container_stopped(self, event):
182         self.images.end_user(event['id'])
183
184
185 class DockerImageCleaner(DockerImageUseRecorder):
186     event_handlers = DockerImageUseRecorder.event_handlers.copy()
187
188     def __init__(self, images, docker_client, events, remove_containers_onexit=False):
189         super().__init__(images, docker_client, events)
190         self.logged_unknown = set()
191         self.remove_containers_onexit = remove_containers_onexit
192
193     def new_container(self, event, container_hash):
194         container_image_id = container_hash['Image']
195         if not self.images.has_image(container_image_id):
196             image_hash = self.docker_client.inspect_image(container_image_id)
197             self.images.add_image(image_hash)
198         return super().new_container(event, container_hash)
199
200     def _remove_container(self, cid):
201         try:
202             self.docker_client.remove_container(cid, v=True)
203         except docker.errors.APIError as error:
204             logger.warning("Failed to remove container %s: %s", cid, error)
205         else:
206             logger.info("Removed container %s", cid)
207
208     @event_handlers.on('die')
209     def clean_container(self, event=None):
210         if self.remove_containers_onexit:
211             self._remove_container(event['id'])
212
213     def check_stopped_containers(self, remove=False):
214         logger.info("Checking for stopped containers")
215         for c in self.docker_client.containers(filters={'status': 'exited'}):
216             logger.info("Container %s %s", c['Id'], c['Status'])
217             if c['Status'][:6] != 'Exited':
218                 logger.error("Unexpected status %s for container %s",
219                              c['Status'], c['Id'])
220             elif remove:
221                 self._remove_container(c['Id'])
222
223     @event_handlers.on('destroy')
224     def clean_images(self, event=None):
225         for image_id in self.images.should_delete():
226             try:
227                 self.docker_client.remove_image(image_id)
228             except docker.errors.APIError as error:
229                 logger.warning(
230                     "Failed to remove image %s: %s", image_id, error)
231             else:
232                 logger.info("Removed image %s", image_id)
233                 self.images.del_image(image_id)
234
235     @event_handlers.on('destroy')
236     def log_unknown_images(self, event):
237         unknown_ids = {image['Id'] for image in self.docker_client.images()
238                        if not self.images.has_image(image['Id'])}
239         for image_id in (unknown_ids - self.logged_unknown):
240             logger.info(
241                 "Image %s is loaded but unused, so it won't be cleaned",
242                 image_id)
243         self.logged_unknown = unknown_ids
244
245
246 def human_size(size_str):
247     size_str = size_str.lower().rstrip('b')
248     multiplier = SUFFIX_SIZES.get(size_str[-1])
249     if multiplier is None:
250         multiplier = 1
251     else:
252         size_str = size_str[:-1]
253     return int(size_str) * multiplier
254
255
256 def load_config(arguments):
257     args = parse_arguments(arguments)
258
259     config = default_config()
260     try:
261         with open(args.config, 'r') as f:
262             c = json.load(f)
263             config.update(c)
264     except (FileNotFoundError, IOError, ValueError) as error:
265         sys.exit('error reading config file {}: {}'.format(args.config, error))
266
267     configargs = vars(args).copy()
268     configargs.pop('config')
269     config.update({k: v for k, v in configargs.items() if v})
270
271     if isinstance(config['Quota'], str):
272         config['Quota'] = human_size(config['Quota'])
273
274     return config
275
276
277 def default_config():
278     return {
279         'Quota': '1G',
280         'RemoveStoppedContainers': 'always',
281         'Verbose': 0,
282     }
283
284
285 def parse_arguments(arguments):
286     class Formatter(argparse.ArgumentDefaultsHelpFormatter,
287                     argparse.RawDescriptionHelpFormatter):
288         pass
289     parser = argparse.ArgumentParser(
290         prog="arvados_docker.cleaner",
291         description="clean old Docker images from Arvados compute nodes",
292         epilog="Example config file:\n\n{}".format(
293             json.dumps(default_config(), indent=4)),
294         formatter_class=Formatter,
295     )
296     parser.add_argument(
297         '--config', action='store', type=str, default='/etc/arvados/docker-cleaner/config.json',
298         help="configuration file")
299
300     deprecated = " (DEPRECATED -- use config file instead)"
301     parser.add_argument(
302         '--quota', action='store', type=human_size, dest='Quota',
303         help="space allowance for Docker images, suffixed with K/M/G/T" + deprecated)
304     parser.add_argument(
305         '--remove-stopped-containers', type=str, default='always', dest='RemoveStoppedContainers',
306         choices=['never', 'onexit', 'always'],
307         help="""when to remove stopped containers (default: always, i.e., remove
308         stopped containers found at startup, and remove containers as
309         soon as they exit)""" + deprecated)
310     parser.add_argument(
311         '--verbose', '-v', action='count', default=0, dest='Verbose',
312         help="log more information" + deprecated)
313
314     return parser.parse_args(arguments)
315
316
317 def setup_logging(config):
318     log_handler = logging.StreamHandler()
319     log_handler.setFormatter(logging.Formatter(
320         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
321         '%Y-%m-%d %H:%M:%S'))
322     logger.addHandler(log_handler)
323     logger.setLevel(logging.ERROR - (10 * config['Verbose']))
324
325
326 def run(config, docker_client):
327     start_time = int(time.time())
328     logger.debug("Loading Docker activity through present")
329     images = DockerImages.from_daemon(config['Quota'], docker_client)
330     use_recorder = DockerImageUseRecorder(
331         images, docker_client, docker_client.events(since=1, until=start_time))
332     use_recorder.run()
333     cleaner = DockerImageCleaner(
334         images, docker_client, docker_client.events(since=start_time),
335         remove_containers_onexit=config['RemoveStoppedContainers'] != 'never')
336     cleaner.check_stopped_containers(
337         remove=config['RemoveStoppedContainers'] == 'always')
338     logger.info("Checking image quota at startup")
339     cleaner.clean_images()
340     logger.info("Listening for docker events")
341     cleaner.run()
342
343
344 def main(arguments=sys.argv[1:]):
345     config = load_config(arguments)
346     setup_logging(config)
347     try:
348         run(config, docker.Client(version='1.14'))
349     except KeyboardInterrupt:
350         sys.exit(1)
351
352 if __name__ == '__main__':
353     main()