22126: Improve formatting of method parameter docstrings
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import collections
7 import datetime
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import subprocess
15 import sys
16 import tarfile
17 import tempfile
18
19 import ciso8601
20 from operator import itemgetter
21 from pathlib import Path
22 from stat import *
23
24 import arvados
25 import arvados.config
26 import arvados.util
27 import arvados.commands._util as arv_cmd
28 import arvados.commands.put as arv_put
29
30 from arvados._internal import basedirs
31 from arvados._version import __version__
32 from typing import (
33     Callable,
34 )
35
36 logger = logging.getLogger('arvados.keepdocker')
37 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
38                 else logging.INFO)
39
40 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
41 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
42
43 DockerImage = collections.namedtuple(
44     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
45
46 keepdocker_parser = argparse.ArgumentParser(add_help=False)
47 keepdocker_parser.add_argument(
48     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
49     help='Print version and exit.')
50 keepdocker_parser.add_argument(
51     '-f', '--force', action='store_true', default=False,
52     help="Re-upload the image even if it already exists on the server")
53 keepdocker_parser.add_argument(
54     '--force-image-format', action='store_true', default=False,
55     help="Proceed even if the image format is not supported by the server")
56
57 _group = keepdocker_parser.add_mutually_exclusive_group()
58 _group.add_argument(
59     '--pull', action='store_true', default=False,
60     help="Try to pull the latest image from Docker registry")
61 _group.add_argument(
62     '--no-pull', action='store_false', dest='pull',
63     help="Use locally installed image only, don't pull image from Docker registry (default)")
64
65 # Combine keepdocker options listed above with run_opts options of arv-put.
66 # The options inherited from arv-put include --name, --project-uuid,
67 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
68 arg_parser = argparse.ArgumentParser(
69         description="Upload or list Docker images in Arvados",
70         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
71
72 arg_parser.add_argument(
73     'image', nargs='?',
74     help="Docker image to upload: repo, repo:tag, or hash")
75 arg_parser.add_argument(
76     'tag', nargs='?',
77     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
78
79 class DockerError(Exception):
80     pass
81
82
83 def popen_docker(cmd, *args, **kwargs):
84     manage_stdin = ('stdin' not in kwargs)
85     kwargs.setdefault('stdin', subprocess.PIPE)
86     kwargs.setdefault('stdout', subprocess.PIPE)
87     kwargs.setdefault('stderr', subprocess.PIPE)
88     try:
89         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
90     except OSError:  # No docker in $PATH, try docker.io
91         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
92     if manage_stdin:
93         docker_proc.stdin.close()
94     return docker_proc
95
96 def check_docker(proc, description):
97     proc.wait()
98     if proc.returncode != 0:
99         raise DockerError("docker {} returned status code {}".
100                           format(description, proc.returncode))
101
102 def docker_image_format(image_hash):
103     """Return the registry format ('v1' or 'v2') of the given image."""
104     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
105                         stdout=subprocess.PIPE)
106     try:
107         image_id = next(cmd.stdout).decode('utf-8').strip()
108         if image_id.startswith('sha256:'):
109             return 'v2'
110         elif ':' not in image_id:
111             return 'v1'
112         else:
113             return 'unknown'
114     finally:
115         check_docker(cmd, "inspect")
116
117 def docker_image_compatible(api, image_hash):
118     supported = api._rootDesc.get('dockerImageFormats', [])
119     if not supported:
120         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
121         return False
122
123     fmt = docker_image_format(image_hash)
124     if fmt in supported:
125         return True
126     else:
127         logger.error("image format is {!r} " \
128             "but server supports only {!r}".format(fmt, supported))
129         return False
130
131 def docker_images():
132     # Yield a DockerImage tuple for each installed image.
133     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
134     list_output = iter(list_proc.stdout)
135     next(list_output)  # Ignore the header line
136     for line in list_output:
137         words = line.split()
138         words = [word.decode('utf-8') for word in words]
139         size_index = len(words) - 2
140         repo, tag, imageid = words[:3]
141         ctime = ' '.join(words[3:size_index])
142         vsize = ' '.join(words[size_index:])
143         yield DockerImage(repo, tag, imageid, ctime, vsize)
144     list_proc.stdout.close()
145     check_docker(list_proc, "images")
146
147 def find_image_hashes(image_search, image_tag=None):
148     # Query for a Docker images with the repository and tag and return
149     # the image ids in a list.  Returns empty list if no match is
150     # found.
151
152     list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE)
153
154     inspect = list_proc.stdout.read()
155     list_proc.stdout.close()
156
157     imageinfo = json.loads(inspect)
158
159     return [i["Id"] for i in imageinfo]
160
161 def find_one_image_hash(image_search, image_tag=None):
162     hashes = find_image_hashes(image_search, image_tag)
163     hash_count = len(hashes)
164     if hash_count == 1:
165         return hashes.pop()
166     elif hash_count == 0:
167         raise DockerError("no matching image found")
168     else:
169         raise DockerError("{} images match {}".format(hash_count, image_search))
170
171 def stat_cache_name(image_file):
172     return getattr(image_file, 'name', image_file) + '.stat'
173
174 def pull_image(image_name, image_tag):
175     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
176                  "pull")
177
178 def save_image(image_hash, image_file):
179     # Save the specified Docker image to image_file, then try to save its
180     # stats so we can try to resume after interruption.
181     check_docker(popen_docker(['save', image_hash], stdout=image_file),
182                  "save")
183     image_file.flush()
184     try:
185         with open(stat_cache_name(image_file), 'w') as statfile:
186             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
187     except STAT_CACHE_ERRORS:
188         pass  # We won't resume from this cache.  No big deal.
189
190 def get_cache_dir(
191         mkparent: Callable[[], Path]=basedirs.BaseDirectories('CACHE').storage_path,
192 ) -> str:
193     path = mkparent() / 'docker'
194     path.mkdir(mode=0o700, exist_ok=True)
195     return str(path)
196
197 def prep_image_file(filename):
198     # Return a file object ready to save a Docker image,
199     # and a boolean indicating whether or not we need to actually save the
200     # image (False if a cached save is available).
201     cache_dir = get_cache_dir()
202     if cache_dir is None:
203         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
204         need_save = True
205     else:
206         file_path = os.path.join(cache_dir, filename)
207         try:
208             with open(stat_cache_name(file_path)) as statfile:
209                 prev_stat = json.load(statfile)
210             now_stat = os.stat(file_path)
211             need_save = any(prev_stat[field] != now_stat[field]
212                             for field in [ST_MTIME, ST_SIZE])
213         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
214             need_save = True  # We couldn't compare against old stats
215         image_file = open(file_path, 'w+b' if need_save else 'rb')
216     return image_file, need_save
217
218 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
219     link_attrs.update({'link_class': link_class, 'name': link_name})
220     return api_client.links().create(body=link_attrs).execute(
221         num_retries=num_retries)
222
223 def docker_link_sort_key(link):
224     """Build a sort key to find the latest available Docker image.
225
226     To find one source collection for a Docker image referenced by
227     name or image id, the API server looks for a link with the most
228     recent `image_timestamp` property; then the most recent
229     `created_at` timestamp.  This method generates a sort key for
230     Docker metadata links to sort them from least to most preferred.
231     """
232     try:
233         image_timestamp = ciso8601.parse_datetime_as_naive(
234             link['properties']['image_timestamp'])
235     except (KeyError, ValueError):
236         image_timestamp = EARLIEST_DATETIME
237     try:
238         created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
239     except ValueError:
240         created_timestamp = None
241     return (image_timestamp, created_timestamp)
242
243 def _get_docker_links(api_client, num_retries, **kwargs):
244     links = list(arvados.util.keyset_list_all(
245         api_client.links().list, num_retries=num_retries, **kwargs,
246     ))
247     for link in links:
248         link['_sort_key'] = docker_link_sort_key(link)
249     links.sort(key=itemgetter('_sort_key'), reverse=True)
250     return links
251
252 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
253     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
254     return {
255         '_sort_key': link['_sort_key'],
256         'timestamp': link['_sort_key'][timestamp_index],
257         'collection': link['head_uuid'],
258         'dockerhash': dockerhash,
259         'repo': repo,
260         'tag': tag,
261         }
262
263 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
264     """List all Docker images known to the api_client with image_name and
265     image_tag.  If no image_name is given, defaults to listing all
266     Docker images.
267
268     Returns a list of tuples representing matching Docker images,
269     sorted in preference order (i.e. the first collection in the list
270     is the one that the API server would use). Each tuple is a
271     (collection_uuid, collection_info) pair, where collection_info is
272     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
273
274     """
275     search_filters = []
276     repo_links = None
277     hash_links = None
278
279     project_filter = []
280     if project_uuid is not None:
281         project_filter = [["owner_uuid", "=", project_uuid]]
282
283     if image_name:
284         # Find images with the name the user specified.
285         search_links = _get_docker_links(
286             api_client, num_retries,
287             filters=[['link_class', '=', 'docker_image_repo+tag'],
288                      ['name', '=',
289                       '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
290         if search_links:
291             repo_links = search_links
292         else:
293             # Fall back to finding images with the specified image hash.
294             search_links = _get_docker_links(
295                 api_client, num_retries,
296                 filters=[['link_class', '=', 'docker_image_hash'],
297                          ['name', 'ilike', image_name + '%']]+project_filter)
298             hash_links = search_links
299         # Only list information about images that were found in the search.
300         search_filters.append(['head_uuid', 'in',
301                                [link['head_uuid'] for link in search_links]])
302
303     # It should be reasonable to expect that each collection only has one
304     # image hash (though there may be many links specifying this).  Find
305     # the API server's most preferred image hash link for each collection.
306     if hash_links is None:
307         hash_links = _get_docker_links(
308             api_client, num_retries,
309             filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
310     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
311
312     # Each collection may have more than one name (though again, one name
313     # may be specified more than once).  Build an image listing from name
314     # tags, sorted by API server preference.
315     if repo_links is None:
316         repo_links = _get_docker_links(
317             api_client, num_retries,
318             filters=search_filters + [['link_class', '=',
319                                        'docker_image_repo+tag']]+project_filter)
320     seen_image_names = collections.defaultdict(set)
321     images = []
322     for link in repo_links:
323         collection_uuid = link['head_uuid']
324         if link['name'] in seen_image_names[collection_uuid]:
325             continue
326         seen_image_names[collection_uuid].add(link['name'])
327         try:
328             dockerhash = hash_link_map[collection_uuid]['name']
329         except KeyError:
330             dockerhash = '<unknown>'
331         name_parts = link['name'].rsplit(':', 1)
332         images.append(_new_image_listing(link, dockerhash, *name_parts))
333
334     # Find any image hash links that did not have a corresponding name link,
335     # and add image listings for them, retaining the API server preference
336     # sorting.
337     images_start_size = len(images)
338     for collection_uuid, link in hash_link_map.items():
339         if not seen_image_names[collection_uuid]:
340             images.append(_new_image_listing(link, link['name']))
341     if len(images) > images_start_size:
342         images.sort(key=itemgetter('_sort_key'), reverse=True)
343
344     # Remove any image listings that refer to unknown collections.
345     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all(
346         api_client.collections().list,
347         num_retries=num_retries,
348         filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
349         select=['uuid'],
350     )}
351     return [(image['collection'], image) for image in images
352             if image['collection'] in existing_coll_uuids]
353
354 def items_owned_by(owner_uuid, arv_items):
355     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
356
357 def _uuid2pdh(api, uuid):
358     return api.collections().list(
359         filters=[['uuid', '=', uuid]],
360         select=['portable_data_hash'],
361     ).execute()['items'][0]['portable_data_hash']
362
363 def load_image_metadata(image_file):
364     """Load an image manifest and config from an archive
365
366     Given an image archive as an open binary file object, this function loads
367     the image manifest and configuration, deserializing each from JSON and
368     returning them in a 2-tuple of dicts.
369     """
370     image_file.seek(0)
371     with tarfile.open(fileobj=image_file) as image_tar:
372         with image_tar.extractfile('manifest.json') as manifest_file:
373             image_manifest_list = json.load(manifest_file)
374         # Because arv-keepdocker only saves one image, there should only be
375         # one manifest.  This extracts that from the list and raises
376         # ValueError if there's not exactly one.
377         image_manifest, = image_manifest_list
378         with image_tar.extractfile(image_manifest['Config']) as config_file:
379             image_config = json.load(config_file)
380     return image_manifest, image_config
381
382 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
383     args = arg_parser.parse_args(arguments)
384     if api is None:
385         api = arvados.api('v1', num_retries=args.retries)
386
387     if args.image is None or args.image == 'images':
388         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
389         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
390         try:
391             for i, j in list_images_in_arv(api, args.retries):
392                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
393         except IOError as e:
394             if e.errno == errno.EPIPE:
395                 pass
396             else:
397                 raise
398         sys.exit(0)
399
400     if re.search(r':\w[-.\w]{0,127}$', args.image):
401         # image ends with :valid-tag
402         if args.tag is not None:
403             logger.error(
404                 "image %r already includes a tag, cannot add tag argument %r",
405                 args.image, args.tag)
406             sys.exit(1)
407         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
408         args.image, args.tag = args.image.rsplit(':', 1)
409     elif args.tag is None:
410         args.tag = 'latest'
411
412     if '/' in args.image:
413         hostport, path = args.image.split('/', 1)
414         if hostport.endswith(':443'):
415             # "docker pull host:443/asdf" transparently removes the
416             # :443 (which is redundant because https is implied) and
417             # after it succeeds "docker images" will list "host/asdf",
418             # not "host:443/asdf".  If we strip the :443 then the name
419             # doesn't change underneath us.
420             args.image = '/'.join([hostport[:-4], path])
421
422     # Pull the image if requested, unless the image is specified as a hash
423     # that we already have.
424     if args.pull and not find_image_hashes(args.image):
425         pull_image(args.image, args.tag)
426
427     images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
428
429     image_hash = None
430     try:
431         image_hash = find_one_image_hash(args.image, args.tag)
432         if not docker_image_compatible(api, image_hash):
433             if args.force_image_format:
434                 logger.warning("forcing incompatible image")
435             else:
436                 logger.error("refusing to store " \
437                     "incompatible format (use --force-image-format to override)")
438                 sys.exit(1)
439     except DockerError as error:
440         if images_in_arv:
441             # We don't have Docker / we don't have the image locally,
442             # use image that's already uploaded to Arvados
443             image_hash = images_in_arv[0][1]['dockerhash']
444         else:
445             logger.error(str(error))
446             sys.exit(1)
447
448     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
449
450     if args.name is None:
451         if image_repo_tag:
452             collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12])
453         else:
454             collection_name = 'Docker image {}'.format(image_hash[0:12])
455     else:
456         collection_name = args.name
457
458     # Acquire a lock so that only one arv-keepdocker process will
459     # dump/upload a particular docker image at a time.  Do this before
460     # checking if the image already exists in Arvados so that if there
461     # is an upload already underway, when that upload completes and
462     # this process gets a turn, it will discover the Docker image is
463     # already available and exit quickly.
464     outfile_name = '{}.tar'.format(image_hash)
465     lockfile_name = '{}.lock'.format(outfile_name)
466     lockfile = None
467     cache_dir = get_cache_dir()
468     if cache_dir:
469         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
470         fcntl.flock(lockfile, fcntl.LOCK_EX)
471
472     try:
473         if not args.force:
474             # Check if this image is already in Arvados.
475
476             # Project where everything should be owned
477             parent_project_uuid = args.project_uuid or api.users().current().execute(
478                 num_retries=args.retries)['uuid']
479
480             # Find image hash tags
481             existing_links = _get_docker_links(
482                 api, args.retries,
483                 filters=[['link_class', '=', 'docker_image_hash'],
484                          ['name', '=', image_hash]])
485             if existing_links:
486                 # get readable collections
487                 collections = api.collections().list(
488                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
489                     select=["uuid", "owner_uuid", "name", "manifest_text"]
490                     ).execute(num_retries=args.retries)['items']
491
492                 if collections:
493                     # check for repo+tag links on these collections
494                     if image_repo_tag:
495                         existing_repo_tag = _get_docker_links(
496                             api, args.retries,
497                             filters=[['link_class', '=', 'docker_image_repo+tag'],
498                                      ['name', '=', image_repo_tag],
499                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
500                     else:
501                         existing_repo_tag = []
502
503                     try:
504                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
505                     except StopIteration:
506                         # create new collection owned by the project
507                         coll_uuid = api.collections().create(
508                             body={"manifest_text": collections[0]['manifest_text'],
509                                   "name": collection_name,
510                                   "owner_uuid": parent_project_uuid,
511                                   "properties": {"docker-image-repo-tag": image_repo_tag}},
512                             ensure_unique_name=True
513                             ).execute(num_retries=args.retries)['uuid']
514
515                     link_base = {'owner_uuid': parent_project_uuid,
516                                  'head_uuid':  coll_uuid,
517                                  'properties': existing_links[0]['properties']}
518
519                     if not any(items_owned_by(parent_project_uuid, existing_links)):
520                         # create image link owned by the project
521                         make_link(api, args.retries,
522                                   'docker_image_hash', image_hash, **link_base)
523
524                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
525                         # create repo+tag link owned by the project
526                         make_link(api, args.retries, 'docker_image_repo+tag',
527                                   image_repo_tag, **link_base)
528
529                     stdout.write(coll_uuid + "\n")
530
531                     sys.exit(0)
532
533         # Open a file for the saved image, and write it if needed.
534         image_file, need_save = prep_image_file(outfile_name)
535         if need_save:
536             save_image(image_hash, image_file)
537
538         # Call arv-put with switches we inherited from it
539         # (a.k.a., switches that aren't our own).
540         if arguments is None:
541             arguments = sys.argv[1:]
542         arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
543         put_args = keepdocker_parser.parse_known_args(arguments)[1]
544
545         # Don't fail when cached manifest is invalid, just ignore the cache.
546         put_args += ['--batch']
547
548         if args.name is None:
549             put_args += ['--name', collection_name]
550
551         coll_uuid = arv_put.main(
552             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
553             install_sig_handlers=install_sig_handlers).strip()
554
555         # Managed properties could be already set
556         coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {})
557         coll_properties.update({"docker-image-repo-tag": image_repo_tag})
558         api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries)
559
560         _, image_metadata = load_image_metadata(image_file)
561         link_base = {'head_uuid': coll_uuid, 'properties': {}}
562         if 'created' in image_metadata:
563             link_base['properties']['image_timestamp'] = image_metadata['created']
564         if args.project_uuid is not None:
565             link_base['owner_uuid'] = args.project_uuid
566
567         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
568         if image_repo_tag:
569             make_link(api, args.retries,
570                       'docker_image_repo+tag', image_repo_tag, **link_base)
571
572         # Clean up.
573         image_file.close()
574         for filename in [stat_cache_name(image_file), image_file.name]:
575             try:
576                 os.unlink(filename)
577             except OSError as error:
578                 if error.errno != errno.ENOENT:
579                     raise
580     finally:
581         if lockfile is not None:
582             # Closing the lockfile unlocks it.
583             lockfile.close()
584
585 if __name__ == '__main__':
586     main()