2 """arvados_docker.cleaner - Remove unused Docker images from compute nodes
5 python3 -m arvados_docker.cleaner --quota 50G
19 SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
21 logger = logging.getLogger('arvados_docker.cleaner')
23 def return_when_docker_not_found(result=None):
24 # If the decorated function raises a 404 error from Docker, return
26 def docker_not_found_decorator(orig_func):
27 @functools.wraps(orig_func)
28 def docker_not_found_wrapper(*args, **kwargs):
30 return orig_func(*args, **kwargs)
31 except docker.errors.APIError as error:
32 if error.response.status_code != 404:
35 return docker_not_found_wrapper
36 return docker_not_found_decorator
39 def __init__(self, image_hash):
40 self.docker_id = image_hash['Id']
41 self.size = image_hash['VirtualSize']
44 def used_at(self, use_time):
45 self.last_used = max(self.last_used, use_time)
49 def __init__(self, target_size):
50 self.target_size = target_size
52 self.container_image_map = {}
55 def from_daemon(cls, target_size, docker_client):
56 images = cls(target_size)
57 for image in docker_client.images():
58 images.add_image(image)
61 def add_image(self, image_hash):
62 image = DockerImage(image_hash)
63 self.images[image.docker_id] = image
64 logger.debug("Registered image %s", image.docker_id)
66 def del_image(self, image_id):
67 if image_id in self.images:
68 del self.images[image_id]
69 self.container_image_map = {
71 for cid, cid_image in self.container_image_map.items()
72 if cid_image != image_id}
73 logger.debug("Unregistered image %s", image_id)
75 def has_image(self, image_id):
76 return image_id in self.images
78 def add_user(self, container_hash, use_time):
79 image_id = container_hash['Image']
80 if image_id in self.images:
81 self.container_image_map[container_hash['Id']] = image_id
82 self.images[image_id].used_at(use_time)
83 logger.debug("Registered container %s using image %s",
84 container_hash['Id'], image_id)
86 def end_user(self, cid):
87 self.container_image_map.pop(cid, None)
88 logger.debug("Unregistered container %s", cid)
90 def should_delete(self):
93 # Build a list of images, ordered by use time.
94 lru_images = list(self.images.values())
95 lru_images.sort(key=lambda image: image.last_used)
96 # Make sure we don't delete any images in use, or if there are
97 # none, the most recently used image.
98 if self.container_image_map:
99 keep_ids = set(self.container_image_map.values())
101 keep_ids = {lru_images[-1].docker_id}
102 space_left = (self.target_size - sum(self.images[image_id].size
103 for image_id in keep_ids))
104 # Go through the list most recently used first, and note which
105 # images can be saved with the space allotted.
106 for image in reversed(lru_images):
107 if (image.docker_id not in keep_ids) and (image.size <= space_left):
108 keep_ids.add(image.docker_id)
109 space_left -= image.size
110 # Yield the Docker IDs of any image we don't want to save, least
111 # recently used first.
112 for image in lru_images:
113 if image.docker_id not in keep_ids:
114 yield image.docker_id
117 class DockerEventHandlers:
118 # This class maps Docker event types to the names of methods that should
119 # receive those events.
121 self.handler_names = collections.defaultdict(list)
123 def on(self, *status_names):
124 def register_handler(handler_method):
125 for status in status_names:
126 self.handler_names[status].append(handler_method.__name__)
127 return handler_method
128 return register_handler
130 def for_event(self, status):
131 return iter(self.handler_names[status])
134 result = self.__class__()
135 result.handler_names = copy.deepcopy(self.handler_names)
139 class DockerEventListener:
140 # To use this class, define event_handlers as an instance of
141 # DockerEventHandlers. Call run() to iterate over events and call the
142 # handler methods as they come in.
145 def __init__(self, events):
149 for event in self.events:
150 event = json.loads(event.decode(self.ENCODING))
151 if event.get('Type', 'container') != 'container':
153 for method_name in self.event_handlers.for_event(event.get('status')):
154 getattr(self, method_name)(event)
157 class DockerImageUseRecorder(DockerEventListener):
158 event_handlers = DockerEventHandlers()
160 def __init__(self, images, docker_client, events):
162 self.docker_client = docker_client
163 super().__init__(events)
165 @event_handlers.on('create')
166 @return_when_docker_not_found()
167 def load_container(self, event):
168 container_hash = self.docker_client.inspect_container(event['id'])
169 self.new_container(event, container_hash)
171 def new_container(self, event, container_hash):
172 self.images.add_user(container_hash, event['time'])
174 @event_handlers.on('destroy')
175 def container_stopped(self, event):
176 self.images.end_user(event['id'])
179 class DockerImageCleaner(DockerImageUseRecorder):
180 event_handlers = DockerImageUseRecorder.event_handlers.copy()
182 def __init__(self, images, docker_client, events, remove_containers_onexit=False):
183 super().__init__(images, docker_client, events)
184 self.logged_unknown = set()
185 self.remove_containers_onexit = remove_containers_onexit
187 def new_container(self, event, container_hash):
188 container_image_id = container_hash['Image']
189 if not self.images.has_image(container_image_id):
190 image_hash = self.docker_client.inspect_image(container_image_id)
191 self.images.add_image(image_hash)
192 return super().new_container(event, container_hash)
194 def _remove_container(self, cid):
196 self.docker_client.remove_container(cid, v=True)
197 except docker.errors.APIError as error:
198 logger.warning("Failed to remove container %s: %s", cid, error)
200 logger.info("Removed container %s", cid)
202 @event_handlers.on('die')
203 def clean_container(self, event=None):
204 if self.remove_containers_onexit:
205 self._remove_container(event['id'])
207 def check_stopped_containers(self, remove=False):
208 logger.info("Checking for stopped containers")
209 for c in self.docker_client.containers(filters={'status': 'exited'}):
210 logger.info("Container %s %s", c['Id'], c['Status'])
211 if c['Status'][:6] != 'Exited':
212 logger.error("Unexpected status %s for container %s",
213 c['Status'], c['Id'])
215 self._remove_container(c['Id'])
217 @event_handlers.on('destroy')
218 def clean_images(self, event=None):
219 for image_id in self.images.should_delete():
221 self.docker_client.remove_image(image_id)
222 except docker.errors.APIError as error:
223 logger.warning("Failed to remove image %s: %s", image_id, error)
225 logger.info("Removed image %s", image_id)
226 self.images.del_image(image_id)
228 @event_handlers.on('destroy')
229 def log_unknown_images(self, event):
230 unknown_ids = {image['Id'] for image in self.docker_client.images()
231 if not self.images.has_image(image['Id'])}
232 for image_id in (unknown_ids - self.logged_unknown):
233 logger.info("Image %s is loaded but unused, so it won't be cleaned",
235 self.logged_unknown = unknown_ids
238 def human_size(size_str):
239 size_str = size_str.lower().rstrip('b')
240 multiplier = SUFFIX_SIZES.get(size_str[-1])
241 if multiplier is None:
244 size_str = size_str[:-1]
245 return int(size_str) * multiplier
247 def parse_arguments(arguments):
248 parser = argparse.ArgumentParser(
249 prog="arvados_docker.cleaner",
250 description="clean old Docker images from Arvados compute nodes")
252 '--quota', action='store', type=human_size, required=True,
253 help="space allowance for Docker images, suffixed with K/M/G/T")
255 '--remove-stopped-containers', type=str, default='always',
256 choices=['never', 'onexit', 'always'],
257 help="""when to remove stopped containers (default: always, i.e., remove
258 stopped containers found at startup, and remove containers as
259 soon as they exit)""")
261 '--verbose', '-v', action='count', default=0,
262 help="log more information")
263 return parser.parse_args(arguments)
265 def setup_logging(args):
266 log_handler = logging.StreamHandler()
267 log_handler.setFormatter(logging.Formatter(
268 '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
269 '%Y-%m-%d %H:%M:%S'))
270 logger.addHandler(log_handler)
271 logger.setLevel(logging.ERROR - (10 * args.verbose))
273 def run(args, docker_client):
274 start_time = int(time.time())
275 logger.debug("Loading Docker activity through present")
276 images = DockerImages.from_daemon(args.quota, docker_client)
277 use_recorder = DockerImageUseRecorder(
278 images, docker_client, docker_client.events(since=1, until=start_time))
280 cleaner = DockerImageCleaner(
281 images, docker_client, docker_client.events(since=start_time),
282 remove_containers_onexit=args.remove_stopped_containers != 'never')
283 cleaner.check_stopped_containers(
284 remove=args.remove_stopped_containers == 'always')
285 logger.info("Checking image quota at startup")
286 cleaner.clean_images()
287 logger.info("Listening for docker events")
291 args = parse_arguments(arguments)
293 run(args, docker.Client(version='1.14'))
295 if __name__ == '__main__':