1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from builtins import next
19 if sys.version_info[0] < 3:
20 import subprocess32 as subprocess
24 from operator import itemgetter
29 import arvados.commands._util as arv_cmd
30 import arvados.commands.put as arv_put
31 from arvados.collection import CollectionReader
36 from arvados._version import __version__
38 logger = logging.getLogger('arvados.keepdocker')
39 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
42 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
43 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
45 DockerImage = collections.namedtuple(
46 'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
48 keepdocker_parser = argparse.ArgumentParser(add_help=False)
49 keepdocker_parser.add_argument(
50 '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
51 help='Print version and exit.')
52 keepdocker_parser.add_argument(
53 '-f', '--force', action='store_true', default=False,
54 help="Re-upload the image even if it already exists on the server")
55 keepdocker_parser.add_argument(
56 '--force-image-format', action='store_true', default=False,
57 help="Proceed even if the image format is not supported by the server")
59 _group = keepdocker_parser.add_mutually_exclusive_group()
61 '--pull', action='store_true', default=False,
62 help="Try to pull the latest image from Docker registry")
64 '--no-pull', action='store_false', dest='pull',
65 help="Use locally installed image only, don't pull image from Docker registry (default)")
67 keepdocker_parser.add_argument(
69 help="Docker image to upload: repo, repo:tag, or hash")
70 keepdocker_parser.add_argument(
72 help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
74 # Combine keepdocker options listed above with run_opts options of arv-put.
75 # The options inherited from arv-put include --name, --project-uuid,
76 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
77 arg_parser = argparse.ArgumentParser(
78 description="Upload or list Docker images in Arvados",
79 parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
81 class DockerError(Exception):
85 def popen_docker(cmd, *args, **kwargs):
86 manage_stdin = ('stdin' not in kwargs)
87 kwargs.setdefault('stdin', subprocess.PIPE)
88 kwargs.setdefault('stdout', sys.stderr)
90 docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
91 except OSError: # No docker.io in $PATH
92 docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
94 docker_proc.stdin.close()
97 def check_docker(proc, description):
99 if proc.returncode != 0:
100 raise DockerError("docker {} returned status code {}".
101 format(description, proc.returncode))
103 def docker_image_format(image_hash):
104 """Return the registry format ('v1' or 'v2') of the given image."""
105 cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
106 stdout=subprocess.PIPE)
108 image_id = next(cmd.stdout).decode().strip()
109 if image_id.startswith('sha256:'):
111 elif ':' not in image_id:
116 check_docker(cmd, "inspect")
118 def docker_image_compatible(api, image_hash):
119 supported = api._rootDesc.get('dockerImageFormats', [])
121 logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
124 fmt = docker_image_format(image_hash)
128 logger.error("image format is {!r} " \
129 "but server supports only {!r}".format(fmt, supported))
133 # Yield a DockerImage tuple for each installed image.
134 list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
135 list_output = iter(list_proc.stdout)
136 next(list_output) # Ignore the header line
137 for line in list_output:
139 words = [word.decode() for word in words]
140 size_index = len(words) - 2
141 repo, tag, imageid = words[:3]
142 ctime = ' '.join(words[3:size_index])
143 vsize = ' '.join(words[size_index:])
144 yield DockerImage(repo, tag, imageid, ctime, vsize)
145 list_proc.stdout.close()
146 check_docker(list_proc, "images")
148 def find_image_hashes(image_search, image_tag=None):
149 # Given one argument, search for Docker images with matching hashes,
150 # and return their full hashes in a set.
151 # Given two arguments, also search for a Docker image with the
152 # same repository and tag. If one is found, return its hash in a
153 # set; otherwise, fall back to the one-argument hash search.
154 # Returns None if no match is found, or a hash search is ambiguous.
155 hash_search = image_search.lower()
157 for image in docker_images():
158 if (image.repo == image_search) and (image.tag == image_tag):
159 return set([image.hash])
160 elif image.hash.startswith(hash_search):
161 hash_matches.add(image.hash)
164 def find_one_image_hash(image_search, image_tag=None):
165 hashes = find_image_hashes(image_search, image_tag)
166 hash_count = len(hashes)
169 elif hash_count == 0:
170 raise DockerError("no matching image found")
172 raise DockerError("{} images match {}".format(hash_count, image_search))
174 def stat_cache_name(image_file):
175 return getattr(image_file, 'name', image_file) + '.stat'
177 def pull_image(image_name, image_tag):
178 check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
181 def save_image(image_hash, image_file):
182 # Save the specified Docker image to image_file, then try to save its
183 # stats so we can try to resume after interruption.
184 check_docker(popen_docker(['save', image_hash], stdout=image_file),
188 with open(stat_cache_name(image_file), 'w') as statfile:
189 json.dump(tuple(os.fstat(image_file.fileno())), statfile)
190 except STAT_CACHE_ERRORS:
191 pass # We won't resume from this cache. No big deal.
194 return arv_cmd.make_home_conf_dir(
195 os.path.join('.cache', 'arvados', 'docker'), 0o700)
197 def prep_image_file(filename):
198 # Return a file object ready to save a Docker image,
199 # and a boolean indicating whether or not we need to actually save the
200 # image (False if a cached save is available).
201 cache_dir = get_cache_dir()
202 if cache_dir is None:
203 image_file = tempfile.NamedTemporaryFile(suffix='.tar')
206 file_path = os.path.join(cache_dir, filename)
208 with open(stat_cache_name(file_path)) as statfile:
209 prev_stat = json.load(statfile)
210 now_stat = os.stat(file_path)
211 need_save = any(prev_stat[field] != now_stat[field]
212 for field in [ST_MTIME, ST_SIZE])
213 except STAT_CACHE_ERRORS + (AttributeError, IndexError):
214 need_save = True # We couldn't compare against old stats
215 image_file = open(file_path, 'w+b' if need_save else 'rb')
216 return image_file, need_save
218 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
219 link_attrs.update({'link_class': link_class, 'name': link_name})
220 return api_client.links().create(body=link_attrs).execute(
221 num_retries=num_retries)
223 def docker_link_sort_key(link):
224 """Build a sort key to find the latest available Docker image.
226 To find one source collection for a Docker image referenced by
227 name or image id, the API server looks for a link with the most
228 recent `image_timestamp` property; then the most recent
229 `created_at` timestamp. This method generates a sort key for
230 Docker metadata links to sort them from least to most preferred.
233 image_timestamp = ciso8601.parse_datetime_unaware(
234 link['properties']['image_timestamp'])
235 except (KeyError, ValueError):
236 image_timestamp = EARLIEST_DATETIME
237 return (image_timestamp,
238 ciso8601.parse_datetime_unaware(link['created_at']))
240 def _get_docker_links(api_client, num_retries, **kwargs):
241 links = arvados.util.list_all(api_client.links().list,
242 num_retries, **kwargs)
244 link['_sort_key'] = docker_link_sort_key(link)
245 links.sort(key=itemgetter('_sort_key'), reverse=True)
248 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
249 timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
251 '_sort_key': link['_sort_key'],
252 'timestamp': link['_sort_key'][timestamp_index],
253 'collection': link['head_uuid'],
254 'dockerhash': dockerhash,
259 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
260 """List all Docker images known to the api_client with image_name and
261 image_tag. If no image_name is given, defaults to listing all
264 Returns a list of tuples representing matching Docker images,
265 sorted in preference order (i.e. the first collection in the list
266 is the one that the API server would use). Each tuple is a
267 (collection_uuid, collection_info) pair, where collection_info is
268 a dict with fields "dockerhash", "repo", "tag", and "timestamp".
275 # Find images with the name the user specified.
276 search_links = _get_docker_links(
277 api_client, num_retries,
278 filters=[['link_class', '=', 'docker_image_repo+tag'],
280 '{}:{}'.format(image_name, image_tag or 'latest')]])
282 repo_links = search_links
284 # Fall back to finding images with the specified image hash.
285 search_links = _get_docker_links(
286 api_client, num_retries,
287 filters=[['link_class', '=', 'docker_image_hash'],
288 ['name', 'ilike', image_name + '%']])
289 hash_links = search_links
290 # Only list information about images that were found in the search.
291 search_filters.append(['head_uuid', 'in',
292 [link['head_uuid'] for link in search_links]])
294 # It should be reasonable to expect that each collection only has one
295 # image hash (though there may be many links specifying this). Find
296 # the API server's most preferred image hash link for each collection.
297 if hash_links is None:
298 hash_links = _get_docker_links(
299 api_client, num_retries,
300 filters=search_filters + [['link_class', '=', 'docker_image_hash']])
301 hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
303 # Each collection may have more than one name (though again, one name
304 # may be specified more than once). Build an image listing from name
305 # tags, sorted by API server preference.
306 if repo_links is None:
307 repo_links = _get_docker_links(
308 api_client, num_retries,
309 filters=search_filters + [['link_class', '=',
310 'docker_image_repo+tag']])
311 seen_image_names = collections.defaultdict(set)
313 for link in repo_links:
314 collection_uuid = link['head_uuid']
315 if link['name'] in seen_image_names[collection_uuid]:
317 seen_image_names[collection_uuid].add(link['name'])
319 dockerhash = hash_link_map[collection_uuid]['name']
321 dockerhash = '<unknown>'
322 name_parts = link['name'].split(':', 1)
323 images.append(_new_image_listing(link, dockerhash, *name_parts))
325 # Find any image hash links that did not have a corresponding name link,
326 # and add image listings for them, retaining the API server preference
328 images_start_size = len(images)
329 for collection_uuid, link in hash_link_map.items():
330 if not seen_image_names[collection_uuid]:
331 images.append(_new_image_listing(link, link['name']))
332 if len(images) > images_start_size:
333 images.sort(key=itemgetter('_sort_key'), reverse=True)
335 # Remove any image listings that refer to unknown collections.
336 existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
337 api_client.collections().list, num_retries,
338 filters=[['uuid', 'in', [im['collection'] for im in images]]],
340 return [(image['collection'], image) for image in images
341 if image['collection'] in existing_coll_uuids]
343 def items_owned_by(owner_uuid, arv_items):
344 return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
346 def _uuid2pdh(api, uuid):
347 return api.collections().list(
348 filters=[['uuid', '=', uuid]],
349 select=['portable_data_hash'],
350 ).execute()['items'][0]['portable_data_hash']
352 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
353 args = arg_parser.parse_args(arguments)
355 api = arvados.api('v1')
357 if args.image is None or args.image == 'images':
358 fmt = "{:30} {:10} {:12} {:29} {:20}\n"
359 stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
361 for i, j in list_images_in_arv(api, args.retries):
362 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
364 if e.errno == errno.EPIPE:
370 if re.search(r':\w[-.\w]{0,127}$', args.image):
371 # image ends with :valid-tag
372 if args.tag is not None:
374 "image %r already includes a tag, cannot add tag argument %r",
375 args.image, args.tag)
377 # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
378 args.image, args.tag = args.image.rsplit(':', 1)
379 elif args.tag is None:
382 # Pull the image if requested, unless the image is specified as a hash
383 # that we already have.
384 if args.pull and not find_image_hashes(args.image):
385 pull_image(args.image, args.tag)
388 image_hash = find_one_image_hash(args.image, args.tag)
389 except DockerError as error:
390 logger.error(error.message)
393 if not docker_image_compatible(api, image_hash):
394 if args.force_image_format:
395 logger.warning("forcing incompatible image")
397 logger.error("refusing to store " \
398 "incompatible format (use --force-image-format to override)")
401 image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
403 if args.name is None:
405 collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
407 collection_name = 'Docker image {}'.format(image_hash[0:12])
409 collection_name = args.name
411 # Acquire a lock so that only one arv-keepdocker process will
412 # dump/upload a particular docker image at a time. Do this before
413 # checking if the image already exists in Arvados so that if there
414 # is an upload already underway, when that upload completes and
415 # this process gets a turn, it will discover the Docker image is
416 # already available and exit quickly.
417 outfile_name = '{}.tar'.format(image_hash)
418 lockfile_name = '{}.lock'.format(outfile_name)
420 cache_dir = get_cache_dir()
422 lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
423 fcntl.flock(lockfile, fcntl.LOCK_EX)
427 # Check if this image is already in Arvados.
429 # Project where everything should be owned
430 parent_project_uuid = args.project_uuid or api.users().current().execute(
431 num_retries=args.retries)['uuid']
433 # Find image hash tags
434 existing_links = _get_docker_links(
436 filters=[['link_class', '=', 'docker_image_hash'],
437 ['name', '=', image_hash]])
439 # get readable collections
440 collections = api.collections().list(
441 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
442 select=["uuid", "owner_uuid", "name", "manifest_text"]
443 ).execute(num_retries=args.retries)['items']
446 # check for repo+tag links on these collections
448 existing_repo_tag = _get_docker_links(
450 filters=[['link_class', '=', 'docker_image_repo+tag'],
451 ['name', '=', image_repo_tag],
452 ['head_uuid', 'in', [c["uuid"] for c in collections]]])
454 existing_repo_tag = []
457 coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
458 except StopIteration:
459 # create new collection owned by the project
460 coll_uuid = api.collections().create(
461 body={"manifest_text": collections[0]['manifest_text'],
462 "name": collection_name,
463 "owner_uuid": parent_project_uuid},
464 ensure_unique_name=True
465 ).execute(num_retries=args.retries)['uuid']
467 link_base = {'owner_uuid': parent_project_uuid,
468 'head_uuid': coll_uuid,
469 'properties': existing_links[0]['properties']}
471 if not any(items_owned_by(parent_project_uuid, existing_links)):
472 # create image link owned by the project
473 make_link(api, args.retries,
474 'docker_image_hash', image_hash, **link_base)
476 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
477 # create repo+tag link owned by the project
478 make_link(api, args.retries, 'docker_image_repo+tag',
479 image_repo_tag, **link_base)
481 stdout.write(coll_uuid + "\n")
485 # Open a file for the saved image, and write it if needed.
486 image_file, need_save = prep_image_file(outfile_name)
488 save_image(image_hash, image_file)
490 # Call arv-put with switches we inherited from it
491 # (a.k.a., switches that aren't our own).
492 put_args = keepdocker_parser.parse_known_args(arguments)[1]
494 if args.name is None:
495 put_args += ['--name', collection_name]
497 coll_uuid = arv_put.main(
498 put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
499 install_sig_handlers=install_sig_handlers).strip()
501 # Read the image metadata and make Arvados links from it.
503 image_tar = tarfile.open(fileobj=image_file)
504 image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
506 json_filename = raw_image_hash + '.json'
508 json_filename = raw_image_hash + '/json'
509 json_file = image_tar.extractfile(image_tar.getmember(json_filename))
510 image_metadata = json.loads(json_file.read().decode())
513 link_base = {'head_uuid': coll_uuid, 'properties': {}}
514 if 'created' in image_metadata:
515 link_base['properties']['image_timestamp'] = image_metadata['created']
516 if args.project_uuid is not None:
517 link_base['owner_uuid'] = args.project_uuid
519 make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
521 make_link(api, args.retries,
522 'docker_image_repo+tag', image_repo_tag, **link_base)
526 for filename in [stat_cache_name(image_file), image_file.name]:
529 except OSError as error:
530 if error.errno != errno.ENOENT:
533 if lockfile is not None:
534 # Closing the lockfile unlocks it.
537 if __name__ == '__main__':