13111: Merge branch 'master' into 12308-go-fuse
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import subprocess
14 import sys
15 import tarfile
16 import tempfile
17 import shutil
18 import _strptime
19
20 from operator import itemgetter
21 from stat import *
22
23 import arvados
24 import arvados.util
25 import arvados.commands._util as arv_cmd
26 import arvados.commands.put as arv_put
27 from arvados.collection import CollectionReader
28 import ciso8601
29 import logging
30 import arvados.config
31
32 from arvados._version import __version__
33
34 logger = logging.getLogger('arvados.keepdocker')
35 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
36                 else logging.INFO)
37
38 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
39 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
40
41 DockerImage = collections.namedtuple(
42     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
43
44 keepdocker_parser = argparse.ArgumentParser(add_help=False)
45 keepdocker_parser.add_argument(
46     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
47     help='Print version and exit.')
48 keepdocker_parser.add_argument(
49     '-f', '--force', action='store_true', default=False,
50     help="Re-upload the image even if it already exists on the server")
51 keepdocker_parser.add_argument(
52     '--force-image-format', action='store_true', default=False,
53     help="Proceed even if the image format is not supported by the server")
54
55 _group = keepdocker_parser.add_mutually_exclusive_group()
56 _group.add_argument(
57     '--pull', action='store_true', default=False,
58     help="Try to pull the latest image from Docker registry")
59 _group.add_argument(
60     '--no-pull', action='store_false', dest='pull',
61     help="Use locally installed image only, don't pull image from Docker registry (default)")
62
63 keepdocker_parser.add_argument(
64     'image', nargs='?',
65     help="Docker image to upload: repo, repo:tag, or hash")
66 keepdocker_parser.add_argument(
67     'tag', nargs='?',
68     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
69
70 # Combine keepdocker options listed above with run_opts options of arv-put.
71 # The options inherited from arv-put include --name, --project-uuid,
72 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
73 arg_parser = argparse.ArgumentParser(
74         description="Upload or list Docker images in Arvados",
75         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
76
77 class DockerError(Exception):
78     pass
79
80
81 def popen_docker(cmd, *args, **kwargs):
82     manage_stdin = ('stdin' not in kwargs)
83     kwargs.setdefault('stdin', subprocess.PIPE)
84     kwargs.setdefault('stdout', sys.stderr)
85     try:
86         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
87     except OSError:  # No docker.io in $PATH
88         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
89     if manage_stdin:
90         docker_proc.stdin.close()
91     return docker_proc
92
93 def check_docker(proc, description):
94     proc.wait()
95     if proc.returncode != 0:
96         raise DockerError("docker {} returned status code {}".
97                           format(description, proc.returncode))
98
99 def docker_image_format(image_hash):
100     """Return the registry format ('v1' or 'v2') of the given image."""
101     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
102                         stdout=subprocess.PIPE)
103     try:
104         image_id = next(cmd.stdout).decode().strip()
105         if image_id.startswith('sha256:'):
106             return 'v2'
107         elif ':' not in image_id:
108             return 'v1'
109         else:
110             return 'unknown'
111     finally:
112         check_docker(cmd, "inspect")
113
114 def docker_image_compatible(api, image_hash):
115     supported = api._rootDesc.get('dockerImageFormats', [])
116     if not supported:
117         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
118         return False
119
120     fmt = docker_image_format(image_hash)
121     if fmt in supported:
122         return True
123     else:
124         logger.error("image format is {!r} " \
125             "but server supports only {!r}".format(fmt, supported))
126         return False
127
128 def docker_images():
129     # Yield a DockerImage tuple for each installed image.
130     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
131     list_output = iter(list_proc.stdout)
132     next(list_output)  # Ignore the header line
133     for line in list_output:
134         words = line.split()
135         size_index = len(words) - 2
136         repo, tag, imageid = words[:3]
137         ctime = ' '.join(words[3:size_index])
138         vsize = ' '.join(words[size_index:])
139         yield DockerImage(repo, tag, imageid, ctime, vsize)
140     list_proc.stdout.close()
141     check_docker(list_proc, "images")
142
143 def find_image_hashes(image_search, image_tag=None):
144     # Given one argument, search for Docker images with matching hashes,
145     # and return their full hashes in a set.
146     # Given two arguments, also search for a Docker image with the
147     # same repository and tag.  If one is found, return its hash in a
148     # set; otherwise, fall back to the one-argument hash search.
149     # Returns None if no match is found, or a hash search is ambiguous.
150     hash_search = image_search.lower()
151     hash_matches = set()
152     for image in docker_images():
153         if (image.repo == image_search) and (image.tag == image_tag):
154             return set([image.hash])
155         elif image.hash.startswith(hash_search):
156             hash_matches.add(image.hash)
157     return hash_matches
158
159 def find_one_image_hash(image_search, image_tag=None):
160     hashes = find_image_hashes(image_search, image_tag)
161     hash_count = len(hashes)
162     if hash_count == 1:
163         return hashes.pop()
164     elif hash_count == 0:
165         raise DockerError("no matching image found")
166     else:
167         raise DockerError("{} images match {}".format(hash_count, image_search))
168
169 def stat_cache_name(image_file):
170     return getattr(image_file, 'name', image_file) + '.stat'
171
172 def pull_image(image_name, image_tag):
173     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
174                  "pull")
175
176 def save_image(image_hash, image_file):
177     # Save the specified Docker image to image_file, then try to save its
178     # stats so we can try to resume after interruption.
179     check_docker(popen_docker(['save', image_hash], stdout=image_file),
180                  "save")
181     image_file.flush()
182     try:
183         with open(stat_cache_name(image_file), 'w') as statfile:
184             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
185     except STAT_CACHE_ERRORS:
186         pass  # We won't resume from this cache.  No big deal.
187
188 def prep_image_file(filename):
189     # Return a file object ready to save a Docker image,
190     # and a boolean indicating whether or not we need to actually save the
191     # image (False if a cached save is available).
192     cache_dir = arv_cmd.make_home_conf_dir(
193         os.path.join('.cache', 'arvados', 'docker'), 0o700)
194     if cache_dir is None:
195         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
196         need_save = True
197     else:
198         file_path = os.path.join(cache_dir, filename)
199         try:
200             with open(stat_cache_name(file_path)) as statfile:
201                 prev_stat = json.load(statfile)
202             now_stat = os.stat(file_path)
203             need_save = any(prev_stat[field] != now_stat[field]
204                             for field in [ST_MTIME, ST_SIZE])
205         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
206             need_save = True  # We couldn't compare against old stats
207         image_file = open(file_path, 'w+b' if need_save else 'rb')
208     return image_file, need_save
209
210 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
211     link_attrs.update({'link_class': link_class, 'name': link_name})
212     return api_client.links().create(body=link_attrs).execute(
213         num_retries=num_retries)
214
215 def docker_link_sort_key(link):
216     """Build a sort key to find the latest available Docker image.
217
218     To find one source collection for a Docker image referenced by
219     name or image id, the API server looks for a link with the most
220     recent `image_timestamp` property; then the most recent
221     `created_at` timestamp.  This method generates a sort key for
222     Docker metadata links to sort them from least to most preferred.
223     """
224     try:
225         image_timestamp = ciso8601.parse_datetime_unaware(
226             link['properties']['image_timestamp'])
227     except (KeyError, ValueError):
228         image_timestamp = EARLIEST_DATETIME
229     return (image_timestamp,
230             ciso8601.parse_datetime_unaware(link['created_at']))
231
232 def _get_docker_links(api_client, num_retries, **kwargs):
233     links = arvados.util.list_all(api_client.links().list,
234                                   num_retries, **kwargs)
235     for link in links:
236         link['_sort_key'] = docker_link_sort_key(link)
237     links.sort(key=itemgetter('_sort_key'), reverse=True)
238     return links
239
240 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
241     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
242     return {
243         '_sort_key': link['_sort_key'],
244         'timestamp': link['_sort_key'][timestamp_index],
245         'collection': link['head_uuid'],
246         'dockerhash': dockerhash,
247         'repo': repo,
248         'tag': tag,
249         }
250
251 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
252     """List all Docker images known to the api_client with image_name and
253     image_tag.  If no image_name is given, defaults to listing all
254     Docker images.
255
256     Returns a list of tuples representing matching Docker images,
257     sorted in preference order (i.e. the first collection in the list
258     is the one that the API server would use). Each tuple is a
259     (collection_uuid, collection_info) pair, where collection_info is
260     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
261
262     """
263     search_filters = []
264     repo_links = None
265     hash_links = None
266     if image_name:
267         # Find images with the name the user specified.
268         search_links = _get_docker_links(
269             api_client, num_retries,
270             filters=[['link_class', '=', 'docker_image_repo+tag'],
271                      ['name', '=',
272                       '{}:{}'.format(image_name, image_tag or 'latest')]])
273         if search_links:
274             repo_links = search_links
275         else:
276             # Fall back to finding images with the specified image hash.
277             search_links = _get_docker_links(
278                 api_client, num_retries,
279                 filters=[['link_class', '=', 'docker_image_hash'],
280                          ['name', 'ilike', image_name + '%']])
281             hash_links = search_links
282         # Only list information about images that were found in the search.
283         search_filters.append(['head_uuid', 'in',
284                                [link['head_uuid'] for link in search_links]])
285
286     # It should be reasonable to expect that each collection only has one
287     # image hash (though there may be many links specifying this).  Find
288     # the API server's most preferred image hash link for each collection.
289     if hash_links is None:
290         hash_links = _get_docker_links(
291             api_client, num_retries,
292             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
293     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
294
295     # Each collection may have more than one name (though again, one name
296     # may be specified more than once).  Build an image listing from name
297     # tags, sorted by API server preference.
298     if repo_links is None:
299         repo_links = _get_docker_links(
300             api_client, num_retries,
301             filters=search_filters + [['link_class', '=',
302                                        'docker_image_repo+tag']])
303     seen_image_names = collections.defaultdict(set)
304     images = []
305     for link in repo_links:
306         collection_uuid = link['head_uuid']
307         if link['name'] in seen_image_names[collection_uuid]:
308             continue
309         seen_image_names[collection_uuid].add(link['name'])
310         try:
311             dockerhash = hash_link_map[collection_uuid]['name']
312         except KeyError:
313             dockerhash = '<unknown>'
314         name_parts = link['name'].split(':', 1)
315         images.append(_new_image_listing(link, dockerhash, *name_parts))
316
317     # Find any image hash links that did not have a corresponding name link,
318     # and add image listings for them, retaining the API server preference
319     # sorting.
320     images_start_size = len(images)
321     for collection_uuid, link in hash_link_map.items():
322         if not seen_image_names[collection_uuid]:
323             images.append(_new_image_listing(link, link['name']))
324     if len(images) > images_start_size:
325         images.sort(key=itemgetter('_sort_key'), reverse=True)
326
327     # Remove any image listings that refer to unknown collections.
328     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
329             api_client.collections().list, num_retries,
330             filters=[['uuid', 'in', [im['collection'] for im in images]]],
331             select=['uuid'])}
332     return [(image['collection'], image) for image in images
333             if image['collection'] in existing_coll_uuids]
334
335 def items_owned_by(owner_uuid, arv_items):
336     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
337
338 def _uuid2pdh(api, uuid):
339     return api.collections().list(
340         filters=[['uuid', '=', uuid]],
341         select=['portable_data_hash'],
342     ).execute()['items'][0]['portable_data_hash']
343
344 def main(arguments=None, stdout=sys.stdout):
345     args = arg_parser.parse_args(arguments)
346     api = arvados.api('v1')
347
348     if args.image is None or args.image == 'images':
349         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
350         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
351         try:
352             for i, j in list_images_in_arv(api, args.retries):
353                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
354         except IOError as e:
355             if e.errno == errno.EPIPE:
356                 pass
357             else:
358                 raise
359         sys.exit(0)
360
361     if re.search(r':\w[-.\w]{0,127}$', args.image):
362         # image ends with :valid-tag
363         if args.tag is not None:
364             logger.error(
365                 "image %r already includes a tag, cannot add tag argument %r",
366                 args.image, args.tag)
367             sys.exit(1)
368         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
369         args.image, args.tag = args.image.rsplit(':', 1)
370     elif args.tag is None:
371         args.tag = 'latest'
372
373     # Pull the image if requested, unless the image is specified as a hash
374     # that we already have.
375     if args.pull and not find_image_hashes(args.image):
376         pull_image(args.image, args.tag)
377
378     try:
379         image_hash = find_one_image_hash(args.image, args.tag)
380     except DockerError as error:
381         logger.error(error.message)
382         sys.exit(1)
383
384     if not docker_image_compatible(api, image_hash):
385         if args.force_image_format:
386             logger.warning("forcing incompatible image")
387         else:
388             logger.error("refusing to store " \
389                 "incompatible format (use --force-image-format to override)")
390             sys.exit(1)
391
392     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
393
394     if args.name is None:
395         if image_repo_tag:
396             collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
397         else:
398             collection_name = 'Docker image {}'.format(image_hash[0:12])
399     else:
400         collection_name = args.name
401
402     if not args.force:
403         # Check if this image is already in Arvados.
404
405         # Project where everything should be owned
406         if args.project_uuid:
407             parent_project_uuid = args.project_uuid
408         else:
409             parent_project_uuid = api.users().current().execute(
410                 num_retries=args.retries)['uuid']
411
412         # Find image hash tags
413         existing_links = _get_docker_links(
414             api, args.retries,
415             filters=[['link_class', '=', 'docker_image_hash'],
416                      ['name', '=', image_hash]])
417         if existing_links:
418             # get readable collections
419             collections = api.collections().list(
420                 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
421                 select=["uuid", "owner_uuid", "name", "manifest_text"]
422                 ).execute(num_retries=args.retries)['items']
423
424             if collections:
425                 # check for repo+tag links on these collections
426                 if image_repo_tag:
427                     existing_repo_tag = _get_docker_links(
428                         api, args.retries,
429                         filters=[['link_class', '=', 'docker_image_repo+tag'],
430                                  ['name', '=', image_repo_tag],
431                                  ['head_uuid', 'in', [c["uuid"] for c in collections]]])
432                 else:
433                     existing_repo_tag = []
434
435                 try:
436                     coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
437                 except StopIteration:
438                     # create new collection owned by the project
439                     coll_uuid = api.collections().create(
440                         body={"manifest_text": collections[0]['manifest_text'],
441                               "name": collection_name,
442                               "owner_uuid": parent_project_uuid},
443                         ensure_unique_name=True
444                         ).execute(num_retries=args.retries)['uuid']
445
446                 link_base = {'owner_uuid': parent_project_uuid,
447                              'head_uuid':  coll_uuid,
448                              'properties': existing_links[0]['properties']}
449
450                 if not any(items_owned_by(parent_project_uuid, existing_links)):
451                     # create image link owned by the project
452                     make_link(api, args.retries,
453                               'docker_image_hash', image_hash, **link_base)
454
455                 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
456                     # create repo+tag link owned by the project
457                     make_link(api, args.retries, 'docker_image_repo+tag',
458                               image_repo_tag, **link_base)
459
460                 stdout.write(coll_uuid + "\n")
461
462                 sys.exit(0)
463
464     # Open a file for the saved image, and write it if needed.
465     outfile_name = '{}.tar'.format(image_hash)
466     image_file, need_save = prep_image_file(outfile_name)
467     if need_save:
468         save_image(image_hash, image_file)
469
470     # Call arv-put with switches we inherited from it
471     # (a.k.a., switches that aren't our own).
472     put_args = keepdocker_parser.parse_known_args(arguments)[1]
473
474     if args.name is None:
475         put_args += ['--name', collection_name]
476
477     coll_uuid = arv_put.main(
478         put_args + ['--filename', outfile_name, image_file.name], stdout=stdout).strip()
479
480     # Read the image metadata and make Arvados links from it.
481     image_file.seek(0)
482     image_tar = tarfile.open(fileobj=image_file)
483     image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
484     if image_hash_type:
485         json_filename = raw_image_hash + '.json'
486     else:
487         json_filename = raw_image_hash + '/json'
488     json_file = image_tar.extractfile(image_tar.getmember(json_filename))
489     image_metadata = json.load(json_file)
490     json_file.close()
491     image_tar.close()
492     link_base = {'head_uuid': coll_uuid, 'properties': {}}
493     if 'created' in image_metadata:
494         link_base['properties']['image_timestamp'] = image_metadata['created']
495     if args.project_uuid is not None:
496         link_base['owner_uuid'] = args.project_uuid
497
498     make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
499     if image_repo_tag:
500         make_link(api, args.retries,
501                   'docker_image_repo+tag', image_repo_tag, **link_base)
502
503     # Clean up.
504     image_file.close()
505     for filename in [stat_cache_name(image_file), image_file.name]:
506         try:
507             os.unlink(filename)
508         except OSError as error:
509             if error.errno != errno.ENOENT:
510                 raise
511
512 if __name__ == '__main__':
513     main()