11901: Add /_health/ping and /_health/db health checks.
[arvados.git] / services / dockercleaner / arvados_docker / cleaner.py
1 #!/usr/bin/env python3
2 """arvados_docker.cleaner - Remove unused Docker images from compute nodes
3
4 Usage:
5   python3 -m arvados_docker.cleaner --quota 50G
6 """
7
8 import argparse
9 import collections
10 import copy
11 import functools
12 import json
13 import logging
14 import sys
15 import time
16
17 import docker
18 import json
19
20 DEFAULT_CONFIG_FILE = '/etc/arvados/docker-cleaner/docker-cleaner.json'
21
22 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
23
24 logger = logging.getLogger('arvados_docker.cleaner')
25
26
27 def return_when_docker_not_found(result=None):
28     # If the decorated function raises a 404 error from Docker, return
29     # `result` instead.
30     def docker_not_found_decorator(orig_func):
31         @functools.wraps(orig_func)
32         def docker_not_found_wrapper(*args, **kwargs):
33             try:
34                 return orig_func(*args, **kwargs)
35             except docker.errors.APIError as error:
36                 if error.response.status_code != 404:
37                     raise
38                 return result
39         return docker_not_found_wrapper
40     return docker_not_found_decorator
41
42
43 class DockerImage:
44
45     def __init__(self, image_hash):
46         self.docker_id = image_hash['Id']
47         self.size = image_hash['VirtualSize']
48         self.last_used = -1
49
50     def used_at(self, use_time):
51         self.last_used = max(self.last_used, use_time)
52
53
54 class DockerImages:
55
56     def __init__(self, target_size):
57         self.target_size = target_size
58         self.images = {}
59         self.container_image_map = {}
60
61     @classmethod
62     def from_daemon(cls, target_size, docker_client):
63         images = cls(target_size)
64         for image in docker_client.images():
65             images.add_image(image)
66         return images
67
68     def add_image(self, image_hash):
69         image = DockerImage(image_hash)
70         self.images[image.docker_id] = image
71         logger.debug("Registered image %s", image.docker_id)
72
73     def del_image(self, image_id):
74         if image_id in self.images:
75             del self.images[image_id]
76             self.container_image_map = {
77                 cid: cid_image
78                 for cid, cid_image in self.container_image_map.items()
79                 if cid_image != image_id}
80             logger.debug("Unregistered image %s", image_id)
81
82     def has_image(self, image_id):
83         return image_id in self.images
84
85     def add_user(self, container_hash, use_time):
86         image_id = container_hash['Image']
87         if image_id in self.images:
88             self.container_image_map[container_hash['Id']] = image_id
89             self.images[image_id].used_at(use_time)
90             logger.debug("Registered container %s using image %s",
91                          container_hash['Id'], image_id)
92
93     def end_user(self, cid):
94         self.container_image_map.pop(cid, None)
95         logger.debug("Unregistered container %s", cid)
96
97     def should_delete(self):
98         if not self.images:
99             return
100         # Build a list of images, ordered by use time.
101         lru_images = list(self.images.values())
102         lru_images.sort(key=lambda image: image.last_used)
103         # Make sure we don't delete any images in use, or if there are
104         # none, the most recently used image.
105         if self.container_image_map:
106             keep_ids = set(self.container_image_map.values())
107         else:
108             keep_ids = {lru_images[-1].docker_id}
109         space_left = (self.target_size - sum(self.images[image_id].size
110                                              for image_id in keep_ids))
111         # Go through the list most recently used first, and note which
112         # images can be saved with the space allotted.
113         for image in reversed(lru_images):
114             if (image.docker_id not in keep_ids) and (image.size <= space_left):
115                 keep_ids.add(image.docker_id)
116                 space_left -= image.size
117         # Yield the Docker IDs of any image we don't want to save, least
118         # recently used first.
119         for image in lru_images:
120             if image.docker_id not in keep_ids:
121                 yield image.docker_id
122
123
124 class DockerEventHandlers:
125     # This class maps Docker event types to the names of methods that should
126     # receive those events.
127
128     def __init__(self):
129         self.handler_names = collections.defaultdict(list)
130
131     def on(self, *status_names):
132         def register_handler(handler_method):
133             for status in status_names:
134                 self.handler_names[status].append(handler_method.__name__)
135             return handler_method
136         return register_handler
137
138     def for_event(self, status):
139         return iter(self.handler_names[status])
140
141     def copy(self):
142         result = self.__class__()
143         result.handler_names = copy.deepcopy(self.handler_names)
144         return result
145
146
147 class DockerEventListener:
148     # To use this class, define event_handlers as an instance of
149     # DockerEventHandlers.  Call run() to iterate over events and call the
150     # handler methods as they come in.
151     ENCODING = 'utf-8'
152
153     def __init__(self, events):
154         self.events = events
155
156     def run(self):
157         for event in self.events:
158             event = json.loads(event.decode(self.ENCODING))
159             if event.get('Type', 'container') != 'container':
160                 continue
161             for method_name in self.event_handlers.for_event(event.get('status')):
162                 getattr(self, method_name)(event)
163
164
165 class DockerImageUseRecorder(DockerEventListener):
166     event_handlers = DockerEventHandlers()
167
168     def __init__(self, images, docker_client, events):
169         self.images = images
170         self.docker_client = docker_client
171         super().__init__(events)
172
173     @event_handlers.on('create')
174     @return_when_docker_not_found()
175     def load_container(self, event):
176         container_hash = self.docker_client.inspect_container(event['id'])
177         self.new_container(event, container_hash)
178
179     def new_container(self, event, container_hash):
180         self.images.add_user(container_hash, event['time'])
181
182     @event_handlers.on('destroy')
183     def container_stopped(self, event):
184         self.images.end_user(event['id'])
185
186
187 class DockerImageCleaner(DockerImageUseRecorder):
188     event_handlers = DockerImageUseRecorder.event_handlers.copy()
189
190     def __init__(self, images, docker_client, events, remove_containers_onexit=False):
191         super().__init__(images, docker_client, events)
192         self.logged_unknown = set()
193         self.remove_containers_onexit = remove_containers_onexit
194
195     def new_container(self, event, container_hash):
196         container_image_id = container_hash['Image']
197         if not self.images.has_image(container_image_id):
198             image_hash = self.docker_client.inspect_image(container_image_id)
199             self.images.add_image(image_hash)
200         return super().new_container(event, container_hash)
201
202     def _remove_container(self, cid):
203         try:
204             self.docker_client.remove_container(cid, v=True)
205         except docker.errors.APIError as error:
206             logger.warning("Failed to remove container %s: %s", cid, error)
207         else:
208             logger.info("Removed container %s", cid)
209
210     @event_handlers.on('die')
211     def clean_container(self, event=None):
212         if self.remove_containers_onexit:
213             self._remove_container(event['id'])
214
215     def check_stopped_containers(self, remove=False):
216         logger.info("Checking for stopped containers")
217         for c in self.docker_client.containers(filters={'status': 'exited'}):
218             logger.info("Container %s %s", c['Id'], c['Status'])
219             if c['Status'][:6] != 'Exited':
220                 logger.error("Unexpected status %s for container %s",
221                              c['Status'], c['Id'])
222             elif remove:
223                 self._remove_container(c['Id'])
224
225     @event_handlers.on('destroy')
226     def clean_images(self, event=None):
227         for image_id in self.images.should_delete():
228             try:
229                 self.docker_client.remove_image(image_id)
230             except docker.errors.APIError as error:
231                 logger.warning(
232                     "Failed to remove image %s: %s", image_id, error)
233             else:
234                 logger.info("Removed image %s", image_id)
235                 self.images.del_image(image_id)
236
237     @event_handlers.on('destroy')
238     def log_unknown_images(self, event):
239         unknown_ids = {image['Id'] for image in self.docker_client.images()
240                        if not self.images.has_image(image['Id'])}
241         for image_id in (unknown_ids - self.logged_unknown):
242             logger.info(
243                 "Image %s is loaded but unused, so it won't be cleaned",
244                 image_id)
245         self.logged_unknown = unknown_ids
246
247
248 def human_size(size_str):
249     size_str = size_str.lower().rstrip('b')
250     multiplier = SUFFIX_SIZES.get(size_str[-1])
251     if multiplier is None:
252         multiplier = 1
253     else:
254         size_str = size_str[:-1]
255     return int(size_str) * multiplier
256
257
258 def load_config(arguments):
259     args = parse_arguments(arguments)
260
261     config = default_config()
262     try:
263         with open(args.config, 'r') as f:
264             c = json.load(f)
265             config.update(c)
266     except (FileNotFoundError, IOError, ValueError) as error:
267         if (isinstance(error, FileNotFoundError) and
268             args.config == DEFAULT_CONFIG_FILE):
269             logger.warning("DEPRECATED: default config file %s not found; "
270                            "relying on command line configuration",
271                            repr(DEFAULT_CONFIG_FILE))
272         else:
273             sys.exit('error reading config file {}: {}'.format(
274                 args.config, error))
275
276     configargs = vars(args).copy()
277     configargs.pop('config')
278     config.update({k: v for k, v in configargs.items() if v})
279
280     if isinstance(config['Quota'], str):
281         config['Quota'] = human_size(config['Quota'])
282
283     return config
284
285
286 def default_config():
287     return {
288         'Quota': '1G',
289         'RemoveStoppedContainers': 'always',
290         'Verbose': 0,
291     }
292
293
294 def parse_arguments(arguments):
295     class Formatter(argparse.ArgumentDefaultsHelpFormatter,
296                     argparse.RawDescriptionHelpFormatter):
297         pass
298     parser = argparse.ArgumentParser(
299         prog="arvados_docker.cleaner",
300         description="clean old Docker images from Arvados compute nodes",
301         epilog="Example config file:\n\n{}".format(
302             json.dumps(default_config(), indent=4)),
303         formatter_class=Formatter,
304     )
305     parser.add_argument(
306         '--config', action='store', type=str, default=DEFAULT_CONFIG_FILE,
307         help="configuration file")
308
309     deprecated = " (DEPRECATED -- use config file instead)"
310     parser.add_argument(
311         '--quota', action='store', type=human_size, dest='Quota',
312         help="space allowance for Docker images, suffixed with K/M/G/T" + deprecated)
313     parser.add_argument(
314         '--remove-stopped-containers', type=str, default='always', dest='RemoveStoppedContainers',
315         choices=['never', 'onexit', 'always'],
316         help="""when to remove stopped containers (default: always, i.e., remove
317         stopped containers found at startup, and remove containers as
318         soon as they exit)""" + deprecated)
319     parser.add_argument(
320         '--verbose', '-v', action='count', default=0, dest='Verbose',
321         help="log more information" + deprecated)
322
323     return parser.parse_args(arguments)
324
325
326 def setup_logging():
327     log_handler = logging.StreamHandler()
328     log_handler.setFormatter(logging.Formatter(
329         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
330         '%Y-%m-%d %H:%M:%S'))
331     logger.addHandler(log_handler)
332
333
334 def configure_logging(config):
335     logger.setLevel(logging.ERROR - (10 * config['Verbose']))
336
337
338 def run(config, docker_client):
339     start_time = int(time.time())
340     logger.debug("Loading Docker activity through present")
341     images = DockerImages.from_daemon(config['Quota'], docker_client)
342     use_recorder = DockerImageUseRecorder(
343         images, docker_client, docker_client.events(since=1, until=start_time))
344     use_recorder.run()
345     cleaner = DockerImageCleaner(
346         images, docker_client, docker_client.events(since=start_time),
347         remove_containers_onexit=config['RemoveStoppedContainers'] != 'never')
348     cleaner.check_stopped_containers(
349         remove=config['RemoveStoppedContainers'] == 'always')
350     logger.info("Checking image quota at startup")
351     cleaner.clean_images()
352     logger.info("Listening for docker events")
353     cleaner.run()
354
355
356 def main(arguments=sys.argv[1:]):
357     setup_logging()
358     config = load_config(arguments)
359     configure_logging(config)
360     try:
361         run(config, docker.Client(version='1.14'))
362     except KeyboardInterrupt:
363         sys.exit(1)
364
365 if __name__ == '__main__':
366     main()