8087: makes changes suggested by radhika
[arvados.git] / services / dockercleaner / arvados_docker / cleaner.py
1 #!/usr/bin/env python3
2 """arvados_docker.cleaner - Remove unused Docker images from compute nodes
3
4 Usage:
5   python3 -m arvados_docker.cleaner --quota 50G
6 """
7
8 import argparse
9 import collections
10 import copy
11 import functools
12 import json
13 import logging
14 import sys
15 import time
16
17 import docker
18
19 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
20
21 logger = logging.getLogger('arvados_docker.cleaner')
22
23 def return_when_docker_not_found(result=None):
24     # If the decorated function raises a 404 error from Docker, return
25     # `result` instead.
26     def docker_not_found_decorator(orig_func):
27         @functools.wraps(orig_func)
28         def docker_not_found_wrapper(*args, **kwargs):
29             try:
30                 return orig_func(*args, **kwargs)
31             except docker.errors.APIError as error:
32                 if error.response.status_code != 404:
33                     raise
34                 return result
35         return docker_not_found_wrapper
36     return docker_not_found_decorator
37
38 class DockerImage:
39     def __init__(self, image_hash):
40         self.docker_id = image_hash['Id']
41         self.size = image_hash['VirtualSize']
42         self.last_used = -1
43
44     def used_at(self, use_time):
45         self.last_used = max(self.last_used, use_time)
46
47
48 class DockerImages:
49     def __init__(self, target_size):
50         self.target_size = target_size
51         self.images = {}
52         self.container_image_map = {}
53
54     @classmethod
55     def from_daemon(cls, target_size, docker_client):
56         images = cls(target_size)
57         for image in docker_client.images():
58             images.add_image(image)
59         return images
60
61     def add_image(self, image_hash):
62         image = DockerImage(image_hash)
63         self.images[image.docker_id] = image
64         logger.debug("Registered image %s", image.docker_id)
65
66     def del_image(self, image_id):
67         if image_id in self.images:
68             del self.images[image_id]
69             self.container_image_map = {
70                 cid: cid_image
71                 for cid, cid_image in self.container_image_map.items()
72                 if cid_image != image_id}
73             logger.debug("Unregistered image %s", image_id)
74
75     def has_image(self, image_id):
76         return image_id in self.images
77
78     def add_user(self, container_hash, use_time):
79         image_id = container_hash['Image']
80         if image_id in self.images:
81             self.container_image_map[container_hash['Id']] = image_id
82             self.images[image_id].used_at(use_time)
83             logger.debug("Registered container %s using image %s",
84                          container_hash['Id'], image_id)
85
86     def end_user(self, cid):
87         self.container_image_map.pop(cid, None)
88         logger.debug("Unregistered container %s", cid)
89
90     def should_delete(self):
91         if not self.images:
92             return
93         # Build a list of images, ordered by use time.
94         lru_images = list(self.images.values())
95         lru_images.sort(key=lambda image: image.last_used)
96         # Make sure we don't delete any images in use, or if there are
97         # none, the most recently used image.
98         if self.container_image_map:
99             keep_ids = set(self.container_image_map.values())
100         else:
101             keep_ids = {lru_images[-1].docker_id}
102         space_left = (self.target_size - sum(self.images[image_id].size
103                                              for image_id in keep_ids))
104         # Go through the list most recently used first, and note which
105         # images can be saved with the space allotted.
106         for image in reversed(lru_images):
107             if (image.docker_id not in keep_ids) and (image.size <= space_left):
108                 keep_ids.add(image.docker_id)
109                 space_left -= image.size
110         # Yield the Docker IDs of any image we don't want to save, least
111         # recently used first.
112         for image in lru_images:
113             if image.docker_id not in keep_ids:
114                 yield image.docker_id
115
116
117 class DockerEventHandlers:
118     # This class maps Docker event types to the names of methods that should
119     # receive those events.
120     def __init__(self):
121         self.handler_names = collections.defaultdict(list)
122
123     def on(self, *status_names):
124         def register_handler(handler_method):
125             for status in status_names:
126                 self.handler_names[status].append(handler_method.__name__)
127             return handler_method
128         return register_handler
129
130     def for_event(self, status):
131         return iter(self.handler_names[status])
132
133     def copy(self):
134         result = self.__class__()
135         result.handler_names = copy.deepcopy(self.handler_names)
136         return result
137
138
139 class DockerEventListener:
140     # To use this class, define event_handlers as an instance of
141     # DockerEventHandlers.  Call run() to iterate over events and call the
142     # handler methods as they come in.
143     ENCODING = 'utf-8'
144
145     def __init__(self, events):
146         self.events = events
147
148     def run(self):
149         for event in self.events:
150             event = json.loads(event.decode(self.ENCODING))
151             for method_name in self.event_handlers.for_event(event['status']):
152                 getattr(self, method_name)(event)
153
154
155 class DockerImageUseRecorder(DockerEventListener):
156     event_handlers = DockerEventHandlers()
157
158     def __init__(self, images, docker_client, events):
159         self.images = images
160         self.docker_client = docker_client
161         super().__init__(events)
162
163     @event_handlers.on('create')
164     @return_when_docker_not_found()
165     def load_container(self, event):
166         container_hash = self.docker_client.inspect_container(event['id'])
167         self.new_container(event, container_hash)
168
169     def new_container(self, event, container_hash):
170         self.images.add_user(container_hash, event['time'])
171
172     @event_handlers.on('destroy')
173     def container_stopped(self, event):
174         self.images.end_user(event['id'])
175
176
177 class DockerImageCleaner(DockerImageUseRecorder):
178     event_handlers = DockerImageUseRecorder.event_handlers.copy()
179
180     def __init__(self, images, docker_client, events, remove_containers_onexit=False):
181         super().__init__(images, docker_client, events)
182         self.logged_unknown = set()
183         self.remove_containers_onexit = remove_containers_onexit
184
185     def new_container(self, event, container_hash):
186         container_image_id = container_hash['Image']
187         if not self.images.has_image(container_image_id):
188             image_hash = self.docker_client.inspect_image(container_image_id)
189             self.images.add_image(image_hash)
190         return super().new_container(event, container_hash)
191
192     def _remove_container(self, cid):
193         try:
194             self.docker_client.remove_container(cid)
195         except docker.errors.APIError as error:
196             logger.warning("Failed to remove container %s: %s", cid, error)
197         else:
198             logger.info("Removed container %s", cid)
199
200     @event_handlers.on('die')
201     def clean_container(self, event=None):
202         if self.remove_containers_onexit:
203             self._remove_container(event['id'])
204
205     def check_stopped_containers(self, remove=False):
206         logger.info("Checking for stopped containers")
207         for c in self.docker_client.containers(filters={'status': 'exited'}):
208             logger.info("Container %s %s", c['Id'], c['Status'])
209             if c['Status'][:6] != 'Exited':
210                 logger.error("Unexpected status %s for container %s",
211                              c['Status'], c['Id'])
212             elif remove:
213                 self._remove_container(c['Id'])
214
215     @event_handlers.on('destroy')
216     def clean_images(self, event=None):
217         for image_id in self.images.should_delete():
218             try:
219                 self.docker_client.remove_image(image_id)
220             except docker.errors.APIError as error:
221                 logger.warning("Failed to remove image %s: %s", image_id, error)
222             else:
223                 logger.info("Removed image %s", image_id)
224                 self.images.del_image(image_id)
225
226     @event_handlers.on('destroy')
227     def log_unknown_images(self, event):
228         unknown_ids = {image['Id'] for image in self.docker_client.images()
229                        if not self.images.has_image(image['Id'])}
230         for image_id in (unknown_ids - self.logged_unknown):
231             logger.info("Image %s is loaded but unused, so it won't be cleaned",
232                         image_id)
233         self.logged_unknown = unknown_ids
234
235
236 def human_size(size_str):
237     size_str = size_str.lower().rstrip('b')
238     multiplier = SUFFIX_SIZES.get(size_str[-1])
239     if multiplier is None:
240         multiplier = 1
241     else:
242         size_str = size_str[:-1]
243     return int(size_str) * multiplier
244
245 def parse_arguments(arguments):
246     parser = argparse.ArgumentParser(
247         prog="arvados_docker.cleaner",
248         description="clean old Docker images from Arvados compute nodes")
249     parser.add_argument(
250         '--quota', action='store', type=human_size, required=True,
251         help="space allowance for Docker images, suffixed with K/M/G/T")
252     parser.add_argument(
253         '--remove-stopped-containers', type=str, default='always',
254         choices=['never', 'onexit', 'always'],
255         help="""when to remove stopped containers (default: always, i.e., remove
256         stopped containers found at startup, and remove containers as
257         soon as they exit)""")
258     parser.add_argument(
259         '--verbose', '-v', action='count', default=0,
260         help="log more information")
261     return parser.parse_args(arguments)
262
263 def setup_logging(args):
264     log_handler = logging.StreamHandler()
265     log_handler.setFormatter(logging.Formatter(
266             '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
267             '%Y-%m-%d %H:%M:%S'))
268     logger.addHandler(log_handler)
269     logger.setLevel(logging.ERROR - (10 * args.verbose))
270
271 def run(args, docker_client):
272     start_time = int(time.time())
273     logger.debug("Loading Docker activity through present")
274     images = DockerImages.from_daemon(args.quota, docker_client)
275     use_recorder = DockerImageUseRecorder(
276         images, docker_client, docker_client.events(since=1, until=start_time))
277     use_recorder.run()
278     cleaner = DockerImageCleaner(
279         images, docker_client, docker_client.events(since=start_time),
280         remove_containers_onexit=args.remove_stopped_containers != 'never')
281     cleaner.check_stopped_containers(
282         remove=args.remove_stopped_containers == 'always')
283     logger.info("Checking image quota at startup")
284     cleaner.clean_images()
285     logger.info("Listening for docker events")
286     cleaner.run()
287
288 def main(arguments):
289     args = parse_arguments(arguments)
290     setup_logging(args)
291     run(args, docker.Client(version='1.14'))
292
293 if __name__ == '__main__':
294     main(sys.argv[1:])