1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from builtins import next
19 from operator import itemgetter
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
29 import arvados.commands._util as arv_cmd
30 import arvados.commands.put as arv_put
31 from arvados.collection import CollectionReader
36 from arvados._version import __version__
38 logger = logging.getLogger('arvados.keepdocker')
39 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
42 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
43 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
45 DockerImage = collections.namedtuple(
46 'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
48 keepdocker_parser = argparse.ArgumentParser(add_help=False)
49 keepdocker_parser.add_argument(
50 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
51 help='Print version and exit.')
52 keepdocker_parser.add_argument(
53 '-f', '--force', action='store_true', default=False,
54 help="Re-upload the image even if it already exists on the server")
55 keepdocker_parser.add_argument(
56 '--force-image-format', action='store_true', default=False,
57 help="Proceed even if the image format is not supported by the server")
59 _group = keepdocker_parser.add_mutually_exclusive_group()
61 '--pull', action='store_true', default=False,
62 help="Try to pull the latest image from Docker registry")
64 '--no-pull', action='store_false', dest='pull',
65 help="Use locally installed image only, don't pull image from Docker registry (default)")
67 # Combine keepdocker options listed above with run_opts options of arv-put.
68 # The options inherited from arv-put include --name, --project-uuid,
69 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
70 arg_parser = argparse.ArgumentParser(
71 description="Upload or list Docker images in Arvados",
72 parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
74 arg_parser.add_argument(
76 help="Docker image to upload: repo, repo:tag, or hash")
77 arg_parser.add_argument(
79 help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
81 class DockerError(Exception):
85 def popen_docker(cmd, *args, **kwargs):
86 manage_stdin = ('stdin' not in kwargs)
87 kwargs.setdefault('stdin', subprocess.PIPE)
88 kwargs.setdefault('stdout', sys.stderr)
90 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
91 except OSError: # No docker in $PATH, try docker.io
92 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
94 docker_proc.stdin.close()
97 def check_docker(proc, description):
99 if proc.returncode != 0:
100 raise DockerError("docker {} returned status code {}".
101 format(description, proc.returncode))
103 def docker_image_format(image_hash):
104 """Return the registry format ('v1' or 'v2') of the given image."""
105 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
106 stdout=subprocess.PIPE)
108 image_id = next(cmd.stdout).decode('utf-8').strip()
109 if image_id.startswith('sha256:'):
111 elif ':' not in image_id:
116 check_docker(cmd, "inspect")
118 def docker_image_compatible(api, image_hash):
119 supported = api._rootDesc.get('dockerImageFormats', [])
121 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
124 fmt = docker_image_format(image_hash)
128 logger.error("image format is {!r} " \
129 "but server supports only {!r}".format(fmt, supported))
133 # Yield a DockerImage tuple for each installed image.
134 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
135 list_output = iter(list_proc.stdout)
136 next(list_output) # Ignore the header line
137 for line in list_output:
139 words = [word.decode('utf-8') for word in words]
140 size_index = len(words) - 2
141 repo, tag, imageid = words[:3]
142 ctime = ' '.join(words[3:size_index])
143 vsize = ' '.join(words[size_index:])
144 yield DockerImage(repo, tag, imageid, ctime, vsize)
145 list_proc.stdout.close()
146 check_docker(list_proc, "images")
148 def find_image_hashes(image_search, image_tag=None):
149 # Query for a Docker images with the repository and tag and return
150 # the image ids in a list. Returns empty list if no match is
153 list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE)
155 inspect = list_proc.stdout.read()
156 list_proc.stdout.close()
158 imageinfo = json.loads(inspect)
160 return [i["Id"] for i in imageinfo]
162 def find_one_image_hash(image_search, image_tag=None):
163 hashes = find_image_hashes(image_search, image_tag)
164 hash_count = len(hashes)
167 elif hash_count == 0:
168 raise DockerError("no matching image found")
170 raise DockerError("{} images match {}".format(hash_count, image_search))
172 def stat_cache_name(image_file):
173 return getattr(image_file, 'name', image_file) + '.stat'
175 def pull_image(image_name, image_tag):
176 check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
179 def save_image(image_hash, image_file):
180 # Save the specified Docker image to image_file, then try to save its
181 # stats so we can try to resume after interruption.
182 check_docker(popen_docker(['save', image_hash], stdout=image_file),
186 with open(stat_cache_name(image_file), 'w') as statfile:
187 json.dump(tuple(os.fstat(image_file.fileno())), statfile)
188 except STAT_CACHE_ERRORS:
189 pass # We won't resume from this cache. No big deal.
192 return arv_cmd.make_home_conf_dir(
193 os.path.join('.cache', 'arvados', 'docker'), 0o700)
195 def prep_image_file(filename):
196 # Return a file object ready to save a Docker image,
197 # and a boolean indicating whether or not we need to actually save the
198 # image (False if a cached save is available).
199 cache_dir = get_cache_dir()
200 if cache_dir is None:
201 image_file = tempfile.NamedTemporaryFile(suffix='.tar')
204 file_path = os.path.join(cache_dir, filename)
206 with open(stat_cache_name(file_path)) as statfile:
207 prev_stat = json.load(statfile)
208 now_stat = os.stat(file_path)
209 need_save = any(prev_stat[field] != now_stat[field]
210 for field in [ST_MTIME, ST_SIZE])
211 except STAT_CACHE_ERRORS + (AttributeError, IndexError):
212 need_save = True # We couldn't compare against old stats
213 image_file = open(file_path, 'w+b' if need_save else 'rb')
214 return image_file, need_save
216 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
217 link_attrs.update({'link_class': link_class, 'name': link_name})
218 return api_client.links().create(body=link_attrs).execute(
219 num_retries=num_retries)
221 def docker_link_sort_key(link):
222 """Build a sort key to find the latest available Docker image.
224 To find one source collection for a Docker image referenced by
225 name or image id, the API server looks for a link with the most
226 recent `image_timestamp` property; then the most recent
227 `created_at` timestamp. This method generates a sort key for
228 Docker metadata links to sort them from least to most preferred.
231 image_timestamp = ciso8601.parse_datetime_as_naive(
232 link['properties']['image_timestamp'])
233 except (KeyError, ValueError):
234 image_timestamp = EARLIEST_DATETIME
236 created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
238 created_timestamp = None
239 return (image_timestamp, created_timestamp)
241 def _get_docker_links(api_client, num_retries, **kwargs):
242 links = arvados.util.list_all(api_client.links().list,
243 num_retries, **kwargs)
245 link['_sort_key'] = docker_link_sort_key(link)
246 links.sort(key=itemgetter('_sort_key'), reverse=True)
249 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
250 timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
252 '_sort_key': link['_sort_key'],
253 'timestamp': link['_sort_key'][timestamp_index],
254 'collection': link['head_uuid'],
255 'dockerhash': dockerhash,
260 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
261 """List all Docker images known to the api_client with image_name and
262 image_tag. If no image_name is given, defaults to listing all
265 Returns a list of tuples representing matching Docker images,
266 sorted in preference order (i.e. the first collection in the list
267 is the one that the API server would use). Each tuple is a
268 (collection_uuid, collection_info) pair, where collection_info is
269 a dict with fields "dockerhash", "repo", "tag", and "timestamp".
276 # Find images with the name the user specified.
277 search_links = _get_docker_links(
278 api_client, num_retries,
279 filters=[['link_class', '=', 'docker_image_repo+tag'],
281 '{}:{}'.format(image_name, image_tag or 'latest')]])
283 repo_links = search_links
285 # Fall back to finding images with the specified image hash.
286 search_links = _get_docker_links(
287 api_client, num_retries,
288 filters=[['link_class', '=', 'docker_image_hash'],
289 ['name', 'ilike', image_name + '%']])
290 hash_links = search_links
291 # Only list information about images that were found in the search.
292 search_filters.append(['head_uuid', 'in',
293 [link['head_uuid'] for link in search_links]])
295 # It should be reasonable to expect that each collection only has one
296 # image hash (though there may be many links specifying this). Find
297 # the API server's most preferred image hash link for each collection.
298 if hash_links is None:
299 hash_links = _get_docker_links(
300 api_client, num_retries,
301 filters=search_filters + [['link_class', '=', 'docker_image_hash']])
302 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
304 # Each collection may have more than one name (though again, one name
305 # may be specified more than once). Build an image listing from name
306 # tags, sorted by API server preference.
307 if repo_links is None:
308 repo_links = _get_docker_links(
309 api_client, num_retries,
310 filters=search_filters + [['link_class', '=',
311 'docker_image_repo+tag']])
312 seen_image_names = collections.defaultdict(set)
314 for link in repo_links:
315 collection_uuid = link['head_uuid']
316 if link['name'] in seen_image_names[collection_uuid]:
318 seen_image_names[collection_uuid].add(link['name'])
320 dockerhash = hash_link_map[collection_uuid]['name']
322 dockerhash = '<unknown>'
323 name_parts = link['name'].split(':', 1)
324 images.append(_new_image_listing(link, dockerhash, *name_parts))
326 # Find any image hash links that did not have a corresponding name link,
327 # and add image listings for them, retaining the API server preference
329 images_start_size = len(images)
330 for collection_uuid, link in hash_link_map.items():
331 if not seen_image_names[collection_uuid]:
332 images.append(_new_image_listing(link, link['name']))
333 if len(images) > images_start_size:
334 images.sort(key=itemgetter('_sort_key'), reverse=True)
336 # Remove any image listings that refer to unknown collections.
337 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
338 api_client.collections().list, num_retries,
339 filters=[['uuid', 'in', [im['collection'] for im in images]]],
341 return [(image['collection'], image) for image in images
342 if image['collection'] in existing_coll_uuids]
344 def items_owned_by(owner_uuid, arv_items):
345 return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
347 def _uuid2pdh(api, uuid):
348 return api.collections().list(
349 filters=[['uuid', '=', uuid]],
350 select=['portable_data_hash'],
351 ).execute()['items'][0]['portable_data_hash']
353 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
354 args = arg_parser.parse_args(arguments)
356 api = arvados.api('v1')
358 if args.image is None or args.image == 'images':
359 fmt = "{:30} {:10} {:12} {:29} {:20}\n"
360 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
362 for i, j in list_images_in_arv(api, args.retries):
363 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
365 if e.errno == errno.EPIPE:
371 if re.search(r':\w[-.\w]{0,127}$', args.image):
372 # image ends with :valid-tag
373 if args.tag is not None:
375 "image %r already includes a tag, cannot add tag argument %r",
376 args.image, args.tag)
378 # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
379 args.image, args.tag = args.image.rsplit(':', 1)
380 elif args.tag is None:
383 # Pull the image if requested, unless the image is specified as a hash
384 # that we already have.
385 if args.pull and not find_image_hashes(args.image):
386 pull_image(args.image, args.tag)
389 image_hash = find_one_image_hash(args.image, args.tag)
390 except DockerError as error:
391 logger.error(str(error))
394 if not docker_image_compatible(api, image_hash):
395 if args.force_image_format:
396 logger.warning("forcing incompatible image")
398 logger.error("refusing to store " \
399 "incompatible format (use --force-image-format to override)")
402 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
404 if args.name is None:
406 collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12])
408 collection_name = 'Docker image {}'.format(image_hash[0:12])
410 collection_name = args.name
412 # Acquire a lock so that only one arv-keepdocker process will
413 # dump/upload a particular docker image at a time. Do this before
414 # checking if the image already exists in Arvados so that if there
415 # is an upload already underway, when that upload completes and
416 # this process gets a turn, it will discover the Docker image is
417 # already available and exit quickly.
418 outfile_name = '{}.tar'.format(image_hash)
419 lockfile_name = '{}.lock'.format(outfile_name)
421 cache_dir = get_cache_dir()
423 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
424 fcntl.flock(lockfile, fcntl.LOCK_EX)
428 # Check if this image is already in Arvados.
430 # Project where everything should be owned
431 parent_project_uuid = args.project_uuid or api.users().current().execute(
432 num_retries=args.retries)['uuid']
434 # Find image hash tags
435 existing_links = _get_docker_links(
437 filters=[['link_class', '=', 'docker_image_hash'],
438 ['name', '=', image_hash]])
440 # get readable collections
441 collections = api.collections().list(
442 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
443 select=["uuid", "owner_uuid", "name", "manifest_text"]
444 ).execute(num_retries=args.retries)['items']
447 # check for repo+tag links on these collections
449 existing_repo_tag = _get_docker_links(
451 filters=[['link_class', '=', 'docker_image_repo+tag'],
452 ['name', '=', image_repo_tag],
453 ['head_uuid', 'in', [c["uuid"] for c in collections]]])
455 existing_repo_tag = []
458 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
459 except StopIteration:
460 # create new collection owned by the project
461 coll_uuid = api.collections().create(
462 body={"manifest_text": collections[0]['manifest_text'],
463 "name": collection_name,
464 "owner_uuid": parent_project_uuid,
465 "properties": {"docker-image-repo-tag": image_repo_tag}},
466 ensure_unique_name=True
467 ).execute(num_retries=args.retries)['uuid']
469 link_base = {'owner_uuid': parent_project_uuid,
470 'head_uuid': coll_uuid,
471 'properties': existing_links[0]['properties']}
473 if not any(items_owned_by(parent_project_uuid, existing_links)):
474 # create image link owned by the project
475 make_link(api, args.retries,
476 'docker_image_hash', image_hash, **link_base)
478 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
479 # create repo+tag link owned by the project
480 make_link(api, args.retries, 'docker_image_repo+tag',
481 image_repo_tag, **link_base)
483 stdout.write(coll_uuid + "\n")
487 # Open a file for the saved image, and write it if needed.
488 image_file, need_save = prep_image_file(outfile_name)
490 save_image(image_hash, image_file)
492 # Call arv-put with switches we inherited from it
493 # (a.k.a., switches that aren't our own).
494 if arguments is None:
495 arguments = sys.argv[1:]
496 arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
497 put_args = keepdocker_parser.parse_known_args(arguments)[1]
499 # Don't fail when cached manifest is invalid, just ignore the cache.
500 put_args += ['--batch']
502 if args.name is None:
503 put_args += ['--name', collection_name]
505 coll_uuid = arv_put.main(
506 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
507 install_sig_handlers=install_sig_handlers).strip()
509 # Managed properties could be already set
510 coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {})
511 coll_properties.update({"docker-image-repo-tag": image_repo_tag})
513 api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries)
515 # Read the image metadata and make Arvados links from it.
517 image_tar = tarfile.open(fileobj=image_file)
518 image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
520 json_filename = raw_image_hash + '.json'
522 json_filename = raw_image_hash + '/json'
523 json_file = image_tar.extractfile(image_tar.getmember(json_filename))
524 image_metadata = json.loads(json_file.read().decode('utf-8'))
527 link_base = {'head_uuid': coll_uuid, 'properties': {}}
528 if 'created' in image_metadata:
529 link_base['properties']['image_timestamp'] = image_metadata['created']
530 if args.project_uuid is not None:
531 link_base['owner_uuid'] = args.project_uuid
533 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
535 make_link(api, args.retries,
536 'docker_image_repo+tag', image_repo_tag, **link_base)
540 for filename in [stat_cache_name(image_file), image_file.name]:
543 except OSError as error:
544 if error.errno != errno.ENOENT:
547 if lockfile is not None:
548 # Closing the lockfile unlocks it.
551 if __name__ == '__main__':