1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from builtins import next
19 from operator import itemgetter
22 if os.name == "posix" and sys.version_info[0] < 3:
23 import subprocess32 as subprocess
29 import arvados.commands._util as arv_cmd
30 import arvados.commands.put as arv_put
31 from arvados.collection import CollectionReader
36 from arvados._version import __version__
38 logger = logging.getLogger('arvados.keepdocker')
39 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
42 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
43 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
45 DockerImage = collections.namedtuple(
46 'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
48 keepdocker_parser = argparse.ArgumentParser(add_help=False)
49 keepdocker_parser.add_argument(
50 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
51 help='Print version and exit.')
52 keepdocker_parser.add_argument(
53 '-f', '--force', action='store_true', default=False,
54 help="Re-upload the image even if it already exists on the server")
55 keepdocker_parser.add_argument(
56 '--force-image-format', action='store_true', default=False,
57 help="Proceed even if the image format is not supported by the server")
59 _group = keepdocker_parser.add_mutually_exclusive_group()
61 '--pull', action='store_true', default=False,
62 help="Try to pull the latest image from Docker registry")
64 '--no-pull', action='store_false', dest='pull',
65 help="Use locally installed image only, don't pull image from Docker registry (default)")
67 # Combine keepdocker options listed above with run_opts options of arv-put.
68 # The options inherited from arv-put include --name, --project-uuid,
69 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
70 arg_parser = argparse.ArgumentParser(
71 description="Upload or list Docker images in Arvados",
72 parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
74 arg_parser.add_argument(
76 help="Docker image to upload: repo, repo:tag, or hash")
77 arg_parser.add_argument(
79 help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
81 class DockerError(Exception):
85 def popen_docker(cmd, *args, **kwargs):
86 manage_stdin = ('stdin' not in kwargs)
87 kwargs.setdefault('stdin', subprocess.PIPE)
88 kwargs.setdefault('stdout', sys.stderr)
90 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
91 except OSError: # No docker.io in $PATH
92 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
94 docker_proc.stdin.close()
97 def check_docker(proc, description):
99 if proc.returncode != 0:
100 raise DockerError("docker {} returned status code {}".
101 format(description, proc.returncode))
103 def docker_image_format(image_hash):
104 """Return the registry format ('v1' or 'v2') of the given image."""
105 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
106 stdout=subprocess.PIPE)
108 image_id = next(cmd.stdout).decode().strip()
109 if image_id.startswith('sha256:'):
111 elif ':' not in image_id:
116 check_docker(cmd, "inspect")
118 def docker_image_compatible(api, image_hash):
119 supported = api._rootDesc.get('dockerImageFormats', [])
121 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
124 fmt = docker_image_format(image_hash)
128 logger.error("image format is {!r} " \
129 "but server supports only {!r}".format(fmt, supported))
133 # Yield a DockerImage tuple for each installed image.
134 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
135 list_output = iter(list_proc.stdout)
136 next(list_output) # Ignore the header line
137 for line in list_output:
139 words = [word.decode() for word in words]
140 size_index = len(words) - 2
141 repo, tag, imageid = words[:3]
142 ctime = ' '.join(words[3:size_index])
143 vsize = ' '.join(words[size_index:])
144 yield DockerImage(repo, tag, imageid, ctime, vsize)
145 list_proc.stdout.close()
146 check_docker(list_proc, "images")
148 def find_image_hashes(image_search, image_tag=None):
149 # Given one argument, search for Docker images with matching hashes,
150 # and return their full hashes in a set.
151 # Given two arguments, also search for a Docker image with the
152 # same repository and tag. If one is found, return its hash in a
153 # set; otherwise, fall back to the one-argument hash search.
154 # Returns None if no match is found, or a hash search is ambiguous.
155 hash_search = image_search.lower()
157 for image in docker_images():
158 if (image.repo == image_search) and (image.tag == image_tag):
159 return set([image.hash])
160 elif image.hash.startswith(hash_search):
161 hash_matches.add(image.hash)
164 def find_one_image_hash(image_search, image_tag=None):
165 hashes = find_image_hashes(image_search, image_tag)
166 hash_count = len(hashes)
169 elif hash_count == 0:
170 raise DockerError("no matching image found")
172 raise DockerError("{} images match {}".format(hash_count, image_search))
174 def stat_cache_name(image_file):
175 return getattr(image_file, 'name', image_file) + '.stat'
177 def pull_image(image_name, image_tag):
178 check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
181 def save_image(image_hash, image_file):
182 # Save the specified Docker image to image_file, then try to save its
183 # stats so we can try to resume after interruption.
184 check_docker(popen_docker(['save', image_hash], stdout=image_file),
188 with open(stat_cache_name(image_file), 'w') as statfile:
189 json.dump(tuple(os.fstat(image_file.fileno())), statfile)
190 except STAT_CACHE_ERRORS:
191 pass # We won't resume from this cache. No big deal.
194 return arv_cmd.make_home_conf_dir(
195 os.path.join('.cache', 'arvados', 'docker'), 0o700)
197 def prep_image_file(filename):
198 # Return a file object ready to save a Docker image,
199 # and a boolean indicating whether or not we need to actually save the
200 # image (False if a cached save is available).
201 cache_dir = get_cache_dir()
202 if cache_dir is None:
203 image_file = tempfile.NamedTemporaryFile(suffix='.tar')
206 file_path = os.path.join(cache_dir, filename)
208 with open(stat_cache_name(file_path)) as statfile:
209 prev_stat = json.load(statfile)
210 now_stat = os.stat(file_path)
211 need_save = any(prev_stat[field] != now_stat[field]
212 for field in [ST_MTIME, ST_SIZE])
213 except STAT_CACHE_ERRORS + (AttributeError, IndexError):
214 need_save = True # We couldn't compare against old stats
215 image_file = open(file_path, 'w+b' if need_save else 'rb')
216 return image_file, need_save
218 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
219 link_attrs.update({'link_class': link_class, 'name': link_name})
220 return api_client.links().create(body=link_attrs).execute(
221 num_retries=num_retries)
223 def docker_link_sort_key(link):
224 """Build a sort key to find the latest available Docker image.
226 To find one source collection for a Docker image referenced by
227 name or image id, the API server looks for a link with the most
228 recent `image_timestamp` property; then the most recent
229 `created_at` timestamp. This method generates a sort key for
230 Docker metadata links to sort them from least to most preferred.
233 image_timestamp = ciso8601.parse_datetime_as_naive(
234 link['properties']['image_timestamp'])
235 except (KeyError, ValueError):
236 image_timestamp = EARLIEST_DATETIME
238 created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
240 created_timestamp = None
241 return (image_timestamp, created_timestamp)
243 def _get_docker_links(api_client, num_retries, **kwargs):
244 links = arvados.util.list_all(api_client.links().list,
245 num_retries, **kwargs)
247 link['_sort_key'] = docker_link_sort_key(link)
248 links.sort(key=itemgetter('_sort_key'), reverse=True)
251 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
252 timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
254 '_sort_key': link['_sort_key'],
255 'timestamp': link['_sort_key'][timestamp_index],
256 'collection': link['head_uuid'],
257 'dockerhash': dockerhash,
262 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
263 """List all Docker images known to the api_client with image_name and
264 image_tag. If no image_name is given, defaults to listing all
267 Returns a list of tuples representing matching Docker images,
268 sorted in preference order (i.e. the first collection in the list
269 is the one that the API server would use). Each tuple is a
270 (collection_uuid, collection_info) pair, where collection_info is
271 a dict with fields "dockerhash", "repo", "tag", and "timestamp".
278 # Find images with the name the user specified.
279 search_links = _get_docker_links(
280 api_client, num_retries,
281 filters=[['link_class', '=', 'docker_image_repo+tag'],
283 '{}:{}'.format(image_name, image_tag or 'latest')]])
285 repo_links = search_links
287 # Fall back to finding images with the specified image hash.
288 search_links = _get_docker_links(
289 api_client, num_retries,
290 filters=[['link_class', '=', 'docker_image_hash'],
291 ['name', 'ilike', image_name + '%']])
292 hash_links = search_links
293 # Only list information about images that were found in the search.
294 search_filters.append(['head_uuid', 'in',
295 [link['head_uuid'] for link in search_links]])
297 # It should be reasonable to expect that each collection only has one
298 # image hash (though there may be many links specifying this). Find
299 # the API server's most preferred image hash link for each collection.
300 if hash_links is None:
301 hash_links = _get_docker_links(
302 api_client, num_retries,
303 filters=search_filters + [['link_class', '=', 'docker_image_hash']])
304 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
306 # Each collection may have more than one name (though again, one name
307 # may be specified more than once). Build an image listing from name
308 # tags, sorted by API server preference.
309 if repo_links is None:
310 repo_links = _get_docker_links(
311 api_client, num_retries,
312 filters=search_filters + [['link_class', '=',
313 'docker_image_repo+tag']])
314 seen_image_names = collections.defaultdict(set)
316 for link in repo_links:
317 collection_uuid = link['head_uuid']
318 if link['name'] in seen_image_names[collection_uuid]:
320 seen_image_names[collection_uuid].add(link['name'])
322 dockerhash = hash_link_map[collection_uuid]['name']
324 dockerhash = '<unknown>'
325 name_parts = link['name'].split(':', 1)
326 images.append(_new_image_listing(link, dockerhash, *name_parts))
328 # Find any image hash links that did not have a corresponding name link,
329 # and add image listings for them, retaining the API server preference
331 images_start_size = len(images)
332 for collection_uuid, link in hash_link_map.items():
333 if not seen_image_names[collection_uuid]:
334 images.append(_new_image_listing(link, link['name']))
335 if len(images) > images_start_size:
336 images.sort(key=itemgetter('_sort_key'), reverse=True)
338 # Remove any image listings that refer to unknown collections.
339 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
340 api_client.collections().list, num_retries,
341 filters=[['uuid', 'in', [im['collection'] for im in images]]],
343 return [(image['collection'], image) for image in images
344 if image['collection'] in existing_coll_uuids]
346 def items_owned_by(owner_uuid, arv_items):
347 return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
349 def _uuid2pdh(api, uuid):
350 return api.collections().list(
351 filters=[['uuid', '=', uuid]],
352 select=['portable_data_hash'],
353 ).execute()['items'][0]['portable_data_hash']
355 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
356 args = arg_parser.parse_args(arguments)
358 api = arvados.api('v1')
360 if args.image is None or args.image == 'images':
361 fmt = "{:30} {:10} {:12} {:29} {:20}\n"
362 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
364 for i, j in list_images_in_arv(api, args.retries):
365 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
367 if e.errno == errno.EPIPE:
373 if re.search(r':\w[-.\w]{0,127}$', args.image):
374 # image ends with :valid-tag
375 if args.tag is not None:
377 "image %r already includes a tag, cannot add tag argument %r",
378 args.image, args.tag)
380 # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
381 args.image, args.tag = args.image.rsplit(':', 1)
382 elif args.tag is None:
385 # Pull the image if requested, unless the image is specified as a hash
386 # that we already have.
387 if args.pull and not find_image_hashes(args.image):
388 pull_image(args.image, args.tag)
391 image_hash = find_one_image_hash(args.image, args.tag)
392 except DockerError as error:
393 logger.error(error.message)
396 if not docker_image_compatible(api, image_hash):
397 if args.force_image_format:
398 logger.warning("forcing incompatible image")
400 logger.error("refusing to store " \
401 "incompatible format (use --force-image-format to override)")
404 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
406 if args.name is None:
408 collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
410 collection_name = 'Docker image {}'.format(image_hash[0:12])
412 collection_name = args.name
414 # Acquire a lock so that only one arv-keepdocker process will
415 # dump/upload a particular docker image at a time. Do this before
416 # checking if the image already exists in Arvados so that if there
417 # is an upload already underway, when that upload completes and
418 # this process gets a turn, it will discover the Docker image is
419 # already available and exit quickly.
420 outfile_name = '{}.tar'.format(image_hash)
421 lockfile_name = '{}.lock'.format(outfile_name)
423 cache_dir = get_cache_dir()
425 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
426 fcntl.flock(lockfile, fcntl.LOCK_EX)
430 # Check if this image is already in Arvados.
432 # Project where everything should be owned
433 parent_project_uuid = args.project_uuid or api.users().current().execute(
434 num_retries=args.retries)['uuid']
436 # Find image hash tags
437 existing_links = _get_docker_links(
439 filters=[['link_class', '=', 'docker_image_hash'],
440 ['name', '=', image_hash]])
442 # get readable collections
443 collections = api.collections().list(
444 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
445 select=["uuid", "owner_uuid", "name", "manifest_text"]
446 ).execute(num_retries=args.retries)['items']
449 # check for repo+tag links on these collections
451 existing_repo_tag = _get_docker_links(
453 filters=[['link_class', '=', 'docker_image_repo+tag'],
454 ['name', '=', image_repo_tag],
455 ['head_uuid', 'in', [c["uuid"] for c in collections]]])
457 existing_repo_tag = []
460 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
461 except StopIteration:
462 # create new collection owned by the project
463 coll_uuid = api.collections().create(
464 body={"manifest_text": collections[0]['manifest_text'],
465 "name": collection_name,
466 "owner_uuid": parent_project_uuid},
467 ensure_unique_name=True
468 ).execute(num_retries=args.retries)['uuid']
470 link_base = {'owner_uuid': parent_project_uuid,
471 'head_uuid': coll_uuid,
472 'properties': existing_links[0]['properties']}
474 if not any(items_owned_by(parent_project_uuid, existing_links)):
475 # create image link owned by the project
476 make_link(api, args.retries,
477 'docker_image_hash', image_hash, **link_base)
479 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
480 # create repo+tag link owned by the project
481 make_link(api, args.retries, 'docker_image_repo+tag',
482 image_repo_tag, **link_base)
484 stdout.write(coll_uuid + "\n")
488 # Open a file for the saved image, and write it if needed.
489 image_file, need_save = prep_image_file(outfile_name)
491 save_image(image_hash, image_file)
493 # Call arv-put with switches we inherited from it
494 # (a.k.a., switches that aren't our own).
495 if arguments is None:
496 arguments = sys.argv[1:]
497 arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
498 put_args = keepdocker_parser.parse_known_args(arguments)[1]
500 if args.name is None:
501 put_args += ['--name', collection_name]
503 coll_uuid = arv_put.main(
504 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
505 install_sig_handlers=install_sig_handlers).strip()
507 # Read the image metadata and make Arvados links from it.
509 image_tar = tarfile.open(fileobj=image_file)
510 image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
512 json_filename = raw_image_hash + '.json'
514 json_filename = raw_image_hash + '/json'
515 json_file = image_tar.extractfile(image_tar.getmember(json_filename))
516 image_metadata = json.loads(json_file.read().decode())
519 link_base = {'head_uuid': coll_uuid, 'properties': {}}
520 if 'created' in image_metadata:
521 link_base['properties']['image_timestamp'] = image_metadata['created']
522 if args.project_uuid is not None:
523 link_base['owner_uuid'] = args.project_uuid
525 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
527 make_link(api, args.retries,
528 'docker_image_repo+tag', image_repo_tag, **link_base)
532 for filename in [stat_cache_name(image_file), image_file.name]:
535 except OSError as error:
536 if error.errno != errno.ENOENT:
539 if lockfile is not None:
540 # Closing the lockfile unlocks it.
543 if __name__ == '__main__':