Merge branch '12134-llfuse-patch'
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import subprocess
14 import sys
15 import tarfile
16 import tempfile
17 import shutil
18 import _strptime
19
20 from operator import itemgetter
21 from stat import *
22
23 import arvados
24 import arvados.util
25 import arvados.commands._util as arv_cmd
26 import arvados.commands.put as arv_put
27 from arvados.collection import CollectionReader
28 import ciso8601
29 import logging
30 import arvados.config
31
32 from arvados._version import __version__
33
34 logger = logging.getLogger('arvados.keepdocker')
35 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
36                 else logging.INFO)
37
38 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
39 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
40
41 DockerImage = collections.namedtuple(
42     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
43
44 keepdocker_parser = argparse.ArgumentParser(add_help=False)
45 keepdocker_parser.add_argument(
46     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
47     help='Print version and exit.')
48 keepdocker_parser.add_argument(
49     '-f', '--force', action='store_true', default=False,
50     help="Re-upload the image even if it already exists on the server")
51 keepdocker_parser.add_argument(
52     '--force-image-format', action='store_true', default=False,
53     help="Proceed even if the image format is not supported by the server")
54
55 _group = keepdocker_parser.add_mutually_exclusive_group()
56 _group.add_argument(
57     '--pull', action='store_true', default=False,
58     help="Try to pull the latest image from Docker registry")
59 _group.add_argument(
60     '--no-pull', action='store_false', dest='pull',
61     help="Use locally installed image only, don't pull image from Docker registry (default)")
62
63 keepdocker_parser.add_argument(
64     'image', nargs='?',
65     help="Docker image to upload, as a repository name or hash")
66 keepdocker_parser.add_argument(
67     'tag', nargs='?', default='latest',
68     help="Tag of the Docker image to upload (default 'latest')")
69
70 # Combine keepdocker options listed above with run_opts options of arv-put.
71 # The options inherited from arv-put include --name, --project-uuid,
72 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
73 arg_parser = argparse.ArgumentParser(
74         description="Upload or list Docker images in Arvados",
75         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
76
77 class DockerError(Exception):
78     pass
79
80
81 def popen_docker(cmd, *args, **kwargs):
82     manage_stdin = ('stdin' not in kwargs)
83     kwargs.setdefault('stdin', subprocess.PIPE)
84     kwargs.setdefault('stdout', sys.stderr)
85     try:
86         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
87     except OSError:  # No docker.io in $PATH
88         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
89     if manage_stdin:
90         docker_proc.stdin.close()
91     return docker_proc
92
93 def check_docker(proc, description):
94     proc.wait()
95     if proc.returncode != 0:
96         raise DockerError("docker {} returned status code {}".
97                           format(description, proc.returncode))
98
99 def docker_image_format(image_hash):
100     """Return the registry format ('v1' or 'v2') of the given image."""
101     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
102                         stdout=subprocess.PIPE)
103     try:
104         image_id = next(cmd.stdout).decode().strip()
105         if image_id.startswith('sha256:'):
106             return 'v2'
107         elif ':' not in image_id:
108             return 'v1'
109         else:
110             return 'unknown'
111     finally:
112         check_docker(cmd, "inspect")
113
114 def docker_image_compatible(api, image_hash):
115     supported = api._rootDesc.get('dockerImageFormats', [])
116     if not supported:
117         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
118         return False
119
120     fmt = docker_image_format(image_hash)
121     if fmt in supported:
122         return True
123     else:
124         logger.error("image format is {!r} " \
125             "but server supports only {!r}".format(fmt, supported))
126         return False
127
128 def docker_images():
129     # Yield a DockerImage tuple for each installed image.
130     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
131     list_output = iter(list_proc.stdout)
132     next(list_output)  # Ignore the header line
133     for line in list_output:
134         words = line.split()
135         size_index = len(words) - 2
136         repo, tag, imageid = words[:3]
137         ctime = ' '.join(words[3:size_index])
138         vsize = ' '.join(words[size_index:])
139         yield DockerImage(repo, tag, imageid, ctime, vsize)
140     list_proc.stdout.close()
141     check_docker(list_proc, "images")
142
143 def find_image_hashes(image_search, image_tag=None):
144     # Given one argument, search for Docker images with matching hashes,
145     # and return their full hashes in a set.
146     # Given two arguments, also search for a Docker image with the
147     # same repository and tag.  If one is found, return its hash in a
148     # set; otherwise, fall back to the one-argument hash search.
149     # Returns None if no match is found, or a hash search is ambiguous.
150     hash_search = image_search.lower()
151     hash_matches = set()
152     for image in docker_images():
153         if (image.repo == image_search) and (image.tag == image_tag):
154             return set([image.hash])
155         elif image.hash.startswith(hash_search):
156             hash_matches.add(image.hash)
157     return hash_matches
158
159 def find_one_image_hash(image_search, image_tag=None):
160     hashes = find_image_hashes(image_search, image_tag)
161     hash_count = len(hashes)
162     if hash_count == 1:
163         return hashes.pop()
164     elif hash_count == 0:
165         raise DockerError("no matching image found")
166     else:
167         raise DockerError("{} images match {}".format(hash_count, image_search))
168
169 def stat_cache_name(image_file):
170     return getattr(image_file, 'name', image_file) + '.stat'
171
172 def pull_image(image_name, image_tag):
173     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
174                  "pull")
175
176 def save_image(image_hash, image_file):
177     # Save the specified Docker image to image_file, then try to save its
178     # stats so we can try to resume after interruption.
179     check_docker(popen_docker(['save', image_hash], stdout=image_file),
180                  "save")
181     image_file.flush()
182     try:
183         with open(stat_cache_name(image_file), 'w') as statfile:
184             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
185     except STAT_CACHE_ERRORS:
186         pass  # We won't resume from this cache.  No big deal.
187
188 def prep_image_file(filename):
189     # Return a file object ready to save a Docker image,
190     # and a boolean indicating whether or not we need to actually save the
191     # image (False if a cached save is available).
192     cache_dir = arv_cmd.make_home_conf_dir(
193         os.path.join('.cache', 'arvados', 'docker'), 0o700)
194     if cache_dir is None:
195         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
196         need_save = True
197     else:
198         file_path = os.path.join(cache_dir, filename)
199         try:
200             with open(stat_cache_name(file_path)) as statfile:
201                 prev_stat = json.load(statfile)
202             now_stat = os.stat(file_path)
203             need_save = any(prev_stat[field] != now_stat[field]
204                             for field in [ST_MTIME, ST_SIZE])
205         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
206             need_save = True  # We couldn't compare against old stats
207         image_file = open(file_path, 'w+b' if need_save else 'rb')
208     return image_file, need_save
209
210 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
211     link_attrs.update({'link_class': link_class, 'name': link_name})
212     return api_client.links().create(body=link_attrs).execute(
213         num_retries=num_retries)
214
215 def docker_link_sort_key(link):
216     """Build a sort key to find the latest available Docker image.
217
218     To find one source collection for a Docker image referenced by
219     name or image id, the API server looks for a link with the most
220     recent `image_timestamp` property; then the most recent
221     `created_at` timestamp.  This method generates a sort key for
222     Docker metadata links to sort them from least to most preferred.
223     """
224     try:
225         image_timestamp = ciso8601.parse_datetime_unaware(
226             link['properties']['image_timestamp'])
227     except (KeyError, ValueError):
228         image_timestamp = EARLIEST_DATETIME
229     return (image_timestamp,
230             ciso8601.parse_datetime_unaware(link['created_at']))
231
232 def _get_docker_links(api_client, num_retries, **kwargs):
233     links = arvados.util.list_all(api_client.links().list,
234                                   num_retries, **kwargs)
235     for link in links:
236         link['_sort_key'] = docker_link_sort_key(link)
237     links.sort(key=itemgetter('_sort_key'), reverse=True)
238     return links
239
240 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
241     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
242     return {
243         '_sort_key': link['_sort_key'],
244         'timestamp': link['_sort_key'][timestamp_index],
245         'collection': link['head_uuid'],
246         'dockerhash': dockerhash,
247         'repo': repo,
248         'tag': tag,
249         }
250
251 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
252     """List all Docker images known to the api_client with image_name and
253     image_tag.  If no image_name is given, defaults to listing all
254     Docker images.
255
256     Returns a list of tuples representing matching Docker images,
257     sorted in preference order (i.e. the first collection in the list
258     is the one that the API server would use). Each tuple is a
259     (collection_uuid, collection_info) pair, where collection_info is
260     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
261
262     """
263     search_filters = []
264     repo_links = None
265     hash_links = None
266     if image_name:
267         # Find images with the name the user specified.
268         search_links = _get_docker_links(
269             api_client, num_retries,
270             filters=[['link_class', '=', 'docker_image_repo+tag'],
271                      ['name', '=',
272                       '{}:{}'.format(image_name, image_tag or 'latest')]])
273         if search_links:
274             repo_links = search_links
275         else:
276             # Fall back to finding images with the specified image hash.
277             search_links = _get_docker_links(
278                 api_client, num_retries,
279                 filters=[['link_class', '=', 'docker_image_hash'],
280                          ['name', 'ilike', image_name + '%']])
281             hash_links = search_links
282         # Only list information about images that were found in the search.
283         search_filters.append(['head_uuid', 'in',
284                                [link['head_uuid'] for link in search_links]])
285
286     # It should be reasonable to expect that each collection only has one
287     # image hash (though there may be many links specifying this).  Find
288     # the API server's most preferred image hash link for each collection.
289     if hash_links is None:
290         hash_links = _get_docker_links(
291             api_client, num_retries,
292             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
293     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
294
295     # Each collection may have more than one name (though again, one name
296     # may be specified more than once).  Build an image listing from name
297     # tags, sorted by API server preference.
298     if repo_links is None:
299         repo_links = _get_docker_links(
300             api_client, num_retries,
301             filters=search_filters + [['link_class', '=',
302                                        'docker_image_repo+tag']])
303     seen_image_names = collections.defaultdict(set)
304     images = []
305     for link in repo_links:
306         collection_uuid = link['head_uuid']
307         if link['name'] in seen_image_names[collection_uuid]:
308             continue
309         seen_image_names[collection_uuid].add(link['name'])
310         try:
311             dockerhash = hash_link_map[collection_uuid]['name']
312         except KeyError:
313             dockerhash = '<unknown>'
314         name_parts = link['name'].split(':', 1)
315         images.append(_new_image_listing(link, dockerhash, *name_parts))
316
317     # Find any image hash links that did not have a corresponding name link,
318     # and add image listings for them, retaining the API server preference
319     # sorting.
320     images_start_size = len(images)
321     for collection_uuid, link in hash_link_map.items():
322         if not seen_image_names[collection_uuid]:
323             images.append(_new_image_listing(link, link['name']))
324     if len(images) > images_start_size:
325         images.sort(key=itemgetter('_sort_key'), reverse=True)
326
327     # Remove any image listings that refer to unknown collections.
328     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
329             api_client.collections().list, num_retries,
330             filters=[['uuid', 'in', [im['collection'] for im in images]]],
331             select=['uuid'])}
332     return [(image['collection'], image) for image in images
333             if image['collection'] in existing_coll_uuids]
334
335 def items_owned_by(owner_uuid, arv_items):
336     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
337
338 def _uuid2pdh(api, uuid):
339     return api.collections().list(
340         filters=[['uuid', '=', uuid]],
341         select=['portable_data_hash'],
342     ).execute()['items'][0]['portable_data_hash']
343
344 def main(arguments=None, stdout=sys.stdout):
345     args = arg_parser.parse_args(arguments)
346     api = arvados.api('v1')
347
348     if args.image is None or args.image == 'images':
349         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
350         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
351         try:
352             for i, j in list_images_in_arv(api, args.retries):
353                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
354         except IOError as e:
355             if e.errno == errno.EPIPE:
356                 pass
357             else:
358                 raise
359         sys.exit(0)
360
361     # Pull the image if requested, unless the image is specified as a hash
362     # that we already have.
363     if args.pull and not find_image_hashes(args.image):
364         pull_image(args.image, args.tag)
365
366     try:
367         image_hash = find_one_image_hash(args.image, args.tag)
368     except DockerError as error:
369         logger.error(error.message)
370         sys.exit(1)
371
372     if not docker_image_compatible(api, image_hash):
373         if args.force_image_format:
374             logger.warning("forcing incompatible image")
375         else:
376             logger.error("refusing to store " \
377                 "incompatible format (use --force-image-format to override)")
378             sys.exit(1)
379
380     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
381
382     if args.name is None:
383         if image_repo_tag:
384             collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
385         else:
386             collection_name = 'Docker image {}'.format(image_hash[0:12])
387     else:
388         collection_name = args.name
389
390     if not args.force:
391         # Check if this image is already in Arvados.
392
393         # Project where everything should be owned
394         if args.project_uuid:
395             parent_project_uuid = args.project_uuid
396         else:
397             parent_project_uuid = api.users().current().execute(
398                 num_retries=args.retries)['uuid']
399
400         # Find image hash tags
401         existing_links = _get_docker_links(
402             api, args.retries,
403             filters=[['link_class', '=', 'docker_image_hash'],
404                      ['name', '=', image_hash]])
405         if existing_links:
406             # get readable collections
407             collections = api.collections().list(
408                 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
409                 select=["uuid", "owner_uuid", "name", "manifest_text"]
410                 ).execute(num_retries=args.retries)['items']
411
412             if collections:
413                 # check for repo+tag links on these collections
414                 if image_repo_tag:
415                     existing_repo_tag = _get_docker_links(
416                         api, args.retries,
417                         filters=[['link_class', '=', 'docker_image_repo+tag'],
418                                  ['name', '=', image_repo_tag],
419                                  ['head_uuid', 'in', [c["uuid"] for c in collections]]])
420                 else:
421                     existing_repo_tag = []
422
423                 try:
424                     coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
425                 except StopIteration:
426                     # create new collection owned by the project
427                     coll_uuid = api.collections().create(
428                         body={"manifest_text": collections[0]['manifest_text'],
429                               "name": collection_name,
430                               "owner_uuid": parent_project_uuid},
431                         ensure_unique_name=True
432                         ).execute(num_retries=args.retries)['uuid']
433
434                 link_base = {'owner_uuid': parent_project_uuid,
435                              'head_uuid':  coll_uuid,
436                              'properties': existing_links[0]['properties']}
437
438                 if not any(items_owned_by(parent_project_uuid, existing_links)):
439                     # create image link owned by the project
440                     make_link(api, args.retries,
441                               'docker_image_hash', image_hash, **link_base)
442
443                 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
444                     # create repo+tag link owned by the project
445                     make_link(api, args.retries, 'docker_image_repo+tag',
446                               image_repo_tag, **link_base)
447
448                 stdout.write(coll_uuid + "\n")
449
450                 sys.exit(0)
451
452     # Open a file for the saved image, and write it if needed.
453     outfile_name = '{}.tar'.format(image_hash)
454     image_file, need_save = prep_image_file(outfile_name)
455     if need_save:
456         save_image(image_hash, image_file)
457
458     # Call arv-put with switches we inherited from it
459     # (a.k.a., switches that aren't our own).
460     put_args = keepdocker_parser.parse_known_args(arguments)[1]
461
462     if args.name is None:
463         put_args += ['--name', collection_name]
464
465     coll_uuid = arv_put.main(
466         put_args + ['--filename', outfile_name, image_file.name], stdout=stdout).strip()
467
468     # Read the image metadata and make Arvados links from it.
469     image_file.seek(0)
470     image_tar = tarfile.open(fileobj=image_file)
471     image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
472     if image_hash_type:
473         json_filename = raw_image_hash + '.json'
474     else:
475         json_filename = raw_image_hash + '/json'
476     json_file = image_tar.extractfile(image_tar.getmember(json_filename))
477     image_metadata = json.load(json_file)
478     json_file.close()
479     image_tar.close()
480     link_base = {'head_uuid': coll_uuid, 'properties': {}}
481     if 'created' in image_metadata:
482         link_base['properties']['image_timestamp'] = image_metadata['created']
483     if args.project_uuid is not None:
484         link_base['owner_uuid'] = args.project_uuid
485
486     make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
487     if image_repo_tag:
488         make_link(api, args.retries,
489                   'docker_image_repo+tag', image_repo_tag, **link_base)
490
491     # Clean up.
492     image_file.close()
493     for filename in [stat_cache_name(image_file), image_file.name]:
494         try:
495             os.unlink(filename)
496         except OSError as error:
497             if error.errno != errno.ENOENT:
498                 raise
499
500 if __name__ == '__main__':
501     main()