Merge branch '21020-pysdk-env-paths'
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import collections
7 import datetime
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import subprocess
15 import sys
16 import tarfile
17 import tempfile
18
19 import ciso8601
20 from operator import itemgetter
21 from pathlib import Path
22 from stat import *
23
24 import arvados
25 import arvados.config
26 import arvados.util
27 import arvados.commands._util as arv_cmd
28 import arvados.commands.put as arv_put
29 from arvados._version import __version__
30
31 from typing import (
32     Callable,
33 )
34
35 logger = logging.getLogger('arvados.keepdocker')
36 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
37                 else logging.INFO)
38
39 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
40 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
41
42 DockerImage = collections.namedtuple(
43     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
44
45 keepdocker_parser = argparse.ArgumentParser(add_help=False)
46 keepdocker_parser.add_argument(
47     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
48     help='Print version and exit.')
49 keepdocker_parser.add_argument(
50     '-f', '--force', action='store_true', default=False,
51     help="Re-upload the image even if it already exists on the server")
52 keepdocker_parser.add_argument(
53     '--force-image-format', action='store_true', default=False,
54     help="Proceed even if the image format is not supported by the server")
55
56 _group = keepdocker_parser.add_mutually_exclusive_group()
57 _group.add_argument(
58     '--pull', action='store_true', default=False,
59     help="Try to pull the latest image from Docker registry")
60 _group.add_argument(
61     '--no-pull', action='store_false', dest='pull',
62     help="Use locally installed image only, don't pull image from Docker registry (default)")
63
64 # Combine keepdocker options listed above with run_opts options of arv-put.
65 # The options inherited from arv-put include --name, --project-uuid,
66 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
67 arg_parser = argparse.ArgumentParser(
68         description="Upload or list Docker images in Arvados",
69         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
70
71 arg_parser.add_argument(
72     'image', nargs='?',
73     help="Docker image to upload: repo, repo:tag, or hash")
74 arg_parser.add_argument(
75     'tag', nargs='?',
76     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
77
78 class DockerError(Exception):
79     pass
80
81
82 def popen_docker(cmd, *args, **kwargs):
83     manage_stdin = ('stdin' not in kwargs)
84     kwargs.setdefault('stdin', subprocess.PIPE)
85     kwargs.setdefault('stdout', subprocess.PIPE)
86     kwargs.setdefault('stderr', subprocess.PIPE)
87     try:
88         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
89     except OSError:  # No docker in $PATH, try docker.io
90         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
91     if manage_stdin:
92         docker_proc.stdin.close()
93     return docker_proc
94
95 def check_docker(proc, description):
96     proc.wait()
97     if proc.returncode != 0:
98         raise DockerError("docker {} returned status code {}".
99                           format(description, proc.returncode))
100
101 def docker_image_format(image_hash):
102     """Return the registry format ('v1' or 'v2') of the given image."""
103     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
104                         stdout=subprocess.PIPE)
105     try:
106         image_id = next(cmd.stdout).decode('utf-8').strip()
107         if image_id.startswith('sha256:'):
108             return 'v2'
109         elif ':' not in image_id:
110             return 'v1'
111         else:
112             return 'unknown'
113     finally:
114         check_docker(cmd, "inspect")
115
116 def docker_image_compatible(api, image_hash):
117     supported = api._rootDesc.get('dockerImageFormats', [])
118     if not supported:
119         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
120         return False
121
122     fmt = docker_image_format(image_hash)
123     if fmt in supported:
124         return True
125     else:
126         logger.error("image format is {!r} " \
127             "but server supports only {!r}".format(fmt, supported))
128         return False
129
130 def docker_images():
131     # Yield a DockerImage tuple for each installed image.
132     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
133     list_output = iter(list_proc.stdout)
134     next(list_output)  # Ignore the header line
135     for line in list_output:
136         words = line.split()
137         words = [word.decode('utf-8') for word in words]
138         size_index = len(words) - 2
139         repo, tag, imageid = words[:3]
140         ctime = ' '.join(words[3:size_index])
141         vsize = ' '.join(words[size_index:])
142         yield DockerImage(repo, tag, imageid, ctime, vsize)
143     list_proc.stdout.close()
144     check_docker(list_proc, "images")
145
146 def find_image_hashes(image_search, image_tag=None):
147     # Query for a Docker images with the repository and tag and return
148     # the image ids in a list.  Returns empty list if no match is
149     # found.
150
151     list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE)
152
153     inspect = list_proc.stdout.read()
154     list_proc.stdout.close()
155
156     imageinfo = json.loads(inspect)
157
158     return [i["Id"] for i in imageinfo]
159
160 def find_one_image_hash(image_search, image_tag=None):
161     hashes = find_image_hashes(image_search, image_tag)
162     hash_count = len(hashes)
163     if hash_count == 1:
164         return hashes.pop()
165     elif hash_count == 0:
166         raise DockerError("no matching image found")
167     else:
168         raise DockerError("{} images match {}".format(hash_count, image_search))
169
170 def stat_cache_name(image_file):
171     return getattr(image_file, 'name', image_file) + '.stat'
172
173 def pull_image(image_name, image_tag):
174     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
175                  "pull")
176
177 def save_image(image_hash, image_file):
178     # Save the specified Docker image to image_file, then try to save its
179     # stats so we can try to resume after interruption.
180     check_docker(popen_docker(['save', image_hash], stdout=image_file),
181                  "save")
182     image_file.flush()
183     try:
184         with open(stat_cache_name(image_file), 'w') as statfile:
185             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
186     except STAT_CACHE_ERRORS:
187         pass  # We won't resume from this cache.  No big deal.
188
189 def get_cache_dir(
190         mkparent: Callable[[], Path]=arvados.util._BaseDirectories('CACHE').storage_path,
191 ) -> str:
192     path = mkparent() / 'docker'
193     path.mkdir(mode=0o700, exist_ok=True)
194     return str(path)
195
196 def prep_image_file(filename):
197     # Return a file object ready to save a Docker image,
198     # and a boolean indicating whether or not we need to actually save the
199     # image (False if a cached save is available).
200     cache_dir = get_cache_dir()
201     if cache_dir is None:
202         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
203         need_save = True
204     else:
205         file_path = os.path.join(cache_dir, filename)
206         try:
207             with open(stat_cache_name(file_path)) as statfile:
208                 prev_stat = json.load(statfile)
209             now_stat = os.stat(file_path)
210             need_save = any(prev_stat[field] != now_stat[field]
211                             for field in [ST_MTIME, ST_SIZE])
212         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
213             need_save = True  # We couldn't compare against old stats
214         image_file = open(file_path, 'w+b' if need_save else 'rb')
215     return image_file, need_save
216
217 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
218     link_attrs.update({'link_class': link_class, 'name': link_name})
219     return api_client.links().create(body=link_attrs).execute(
220         num_retries=num_retries)
221
222 def docker_link_sort_key(link):
223     """Build a sort key to find the latest available Docker image.
224
225     To find one source collection for a Docker image referenced by
226     name or image id, the API server looks for a link with the most
227     recent `image_timestamp` property; then the most recent
228     `created_at` timestamp.  This method generates a sort key for
229     Docker metadata links to sort them from least to most preferred.
230     """
231     try:
232         image_timestamp = ciso8601.parse_datetime_as_naive(
233             link['properties']['image_timestamp'])
234     except (KeyError, ValueError):
235         image_timestamp = EARLIEST_DATETIME
236     try:
237         created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
238     except ValueError:
239         created_timestamp = None
240     return (image_timestamp, created_timestamp)
241
242 def _get_docker_links(api_client, num_retries, **kwargs):
243     links = list(arvados.util.keyset_list_all(
244         api_client.links().list, num_retries=num_retries, **kwargs,
245     ))
246     for link in links:
247         link['_sort_key'] = docker_link_sort_key(link)
248     links.sort(key=itemgetter('_sort_key'), reverse=True)
249     return links
250
251 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
252     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
253     return {
254         '_sort_key': link['_sort_key'],
255         'timestamp': link['_sort_key'][timestamp_index],
256         'collection': link['head_uuid'],
257         'dockerhash': dockerhash,
258         'repo': repo,
259         'tag': tag,
260         }
261
262 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
263     """List all Docker images known to the api_client with image_name and
264     image_tag.  If no image_name is given, defaults to listing all
265     Docker images.
266
267     Returns a list of tuples representing matching Docker images,
268     sorted in preference order (i.e. the first collection in the list
269     is the one that the API server would use). Each tuple is a
270     (collection_uuid, collection_info) pair, where collection_info is
271     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
272
273     """
274     search_filters = []
275     repo_links = None
276     hash_links = None
277
278     project_filter = []
279     if project_uuid is not None:
280         project_filter = [["owner_uuid", "=", project_uuid]]
281
282     if image_name:
283         # Find images with the name the user specified.
284         search_links = _get_docker_links(
285             api_client, num_retries,
286             filters=[['link_class', '=', 'docker_image_repo+tag'],
287                      ['name', '=',
288                       '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
289         if search_links:
290             repo_links = search_links
291         else:
292             # Fall back to finding images with the specified image hash.
293             search_links = _get_docker_links(
294                 api_client, num_retries,
295                 filters=[['link_class', '=', 'docker_image_hash'],
296                          ['name', 'ilike', image_name + '%']]+project_filter)
297             hash_links = search_links
298         # Only list information about images that were found in the search.
299         search_filters.append(['head_uuid', 'in',
300                                [link['head_uuid'] for link in search_links]])
301
302     # It should be reasonable to expect that each collection only has one
303     # image hash (though there may be many links specifying this).  Find
304     # the API server's most preferred image hash link for each collection.
305     if hash_links is None:
306         hash_links = _get_docker_links(
307             api_client, num_retries,
308             filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
309     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
310
311     # Each collection may have more than one name (though again, one name
312     # may be specified more than once).  Build an image listing from name
313     # tags, sorted by API server preference.
314     if repo_links is None:
315         repo_links = _get_docker_links(
316             api_client, num_retries,
317             filters=search_filters + [['link_class', '=',
318                                        'docker_image_repo+tag']]+project_filter)
319     seen_image_names = collections.defaultdict(set)
320     images = []
321     for link in repo_links:
322         collection_uuid = link['head_uuid']
323         if link['name'] in seen_image_names[collection_uuid]:
324             continue
325         seen_image_names[collection_uuid].add(link['name'])
326         try:
327             dockerhash = hash_link_map[collection_uuid]['name']
328         except KeyError:
329             dockerhash = '<unknown>'
330         name_parts = link['name'].rsplit(':', 1)
331         images.append(_new_image_listing(link, dockerhash, *name_parts))
332
333     # Find any image hash links that did not have a corresponding name link,
334     # and add image listings for them, retaining the API server preference
335     # sorting.
336     images_start_size = len(images)
337     for collection_uuid, link in hash_link_map.items():
338         if not seen_image_names[collection_uuid]:
339             images.append(_new_image_listing(link, link['name']))
340     if len(images) > images_start_size:
341         images.sort(key=itemgetter('_sort_key'), reverse=True)
342
343     # Remove any image listings that refer to unknown collections.
344     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all(
345         api_client.collections().list,
346         num_retries=num_retries,
347         filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
348         select=['uuid'],
349     )}
350     return [(image['collection'], image) for image in images
351             if image['collection'] in existing_coll_uuids]
352
353 def items_owned_by(owner_uuid, arv_items):
354     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
355
356 def _uuid2pdh(api, uuid):
357     return api.collections().list(
358         filters=[['uuid', '=', uuid]],
359         select=['portable_data_hash'],
360     ).execute()['items'][0]['portable_data_hash']
361
362 def load_image_metadata(image_file):
363     """Load an image manifest and config from an archive
364
365     Given an image archive as an open binary file object, this function loads
366     the image manifest and configuration, deserializing each from JSON and
367     returning them in a 2-tuple of dicts.
368     """
369     image_file.seek(0)
370     with tarfile.open(fileobj=image_file) as image_tar:
371         with image_tar.extractfile('manifest.json') as manifest_file:
372             image_manifest_list = json.load(manifest_file)
373         # Because arv-keepdocker only saves one image, there should only be
374         # one manifest.  This extracts that from the list and raises
375         # ValueError if there's not exactly one.
376         image_manifest, = image_manifest_list
377         with image_tar.extractfile(image_manifest['Config']) as config_file:
378             image_config = json.load(config_file)
379     return image_manifest, image_config
380
381 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
382     args = arg_parser.parse_args(arguments)
383     if api is None:
384         api = arvados.api('v1', num_retries=args.retries)
385
386     if args.image is None or args.image == 'images':
387         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
388         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
389         try:
390             for i, j in list_images_in_arv(api, args.retries):
391                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
392         except IOError as e:
393             if e.errno == errno.EPIPE:
394                 pass
395             else:
396                 raise
397         sys.exit(0)
398
399     if re.search(r':\w[-.\w]{0,127}$', args.image):
400         # image ends with :valid-tag
401         if args.tag is not None:
402             logger.error(
403                 "image %r already includes a tag, cannot add tag argument %r",
404                 args.image, args.tag)
405             sys.exit(1)
406         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
407         args.image, args.tag = args.image.rsplit(':', 1)
408     elif args.tag is None:
409         args.tag = 'latest'
410
411     if '/' in args.image:
412         hostport, path = args.image.split('/', 1)
413         if hostport.endswith(':443'):
414             # "docker pull host:443/asdf" transparently removes the
415             # :443 (which is redundant because https is implied) and
416             # after it succeeds "docker images" will list "host/asdf",
417             # not "host:443/asdf".  If we strip the :443 then the name
418             # doesn't change underneath us.
419             args.image = '/'.join([hostport[:-4], path])
420
421     # Pull the image if requested, unless the image is specified as a hash
422     # that we already have.
423     if args.pull and not find_image_hashes(args.image):
424         pull_image(args.image, args.tag)
425
426     images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
427
428     image_hash = None
429     try:
430         image_hash = find_one_image_hash(args.image, args.tag)
431         if not docker_image_compatible(api, image_hash):
432             if args.force_image_format:
433                 logger.warning("forcing incompatible image")
434             else:
435                 logger.error("refusing to store " \
436                     "incompatible format (use --force-image-format to override)")
437                 sys.exit(1)
438     except DockerError as error:
439         if images_in_arv:
440             # We don't have Docker / we don't have the image locally,
441             # use image that's already uploaded to Arvados
442             image_hash = images_in_arv[0][1]['dockerhash']
443         else:
444             logger.error(str(error))
445             sys.exit(1)
446
447     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
448
449     if args.name is None:
450         if image_repo_tag:
451             collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12])
452         else:
453             collection_name = 'Docker image {}'.format(image_hash[0:12])
454     else:
455         collection_name = args.name
456
457     # Acquire a lock so that only one arv-keepdocker process will
458     # dump/upload a particular docker image at a time.  Do this before
459     # checking if the image already exists in Arvados so that if there
460     # is an upload already underway, when that upload completes and
461     # this process gets a turn, it will discover the Docker image is
462     # already available and exit quickly.
463     outfile_name = '{}.tar'.format(image_hash)
464     lockfile_name = '{}.lock'.format(outfile_name)
465     lockfile = None
466     cache_dir = get_cache_dir()
467     if cache_dir:
468         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
469         fcntl.flock(lockfile, fcntl.LOCK_EX)
470
471     try:
472         if not args.force:
473             # Check if this image is already in Arvados.
474
475             # Project where everything should be owned
476             parent_project_uuid = args.project_uuid or api.users().current().execute(
477                 num_retries=args.retries)['uuid']
478
479             # Find image hash tags
480             existing_links = _get_docker_links(
481                 api, args.retries,
482                 filters=[['link_class', '=', 'docker_image_hash'],
483                          ['name', '=', image_hash]])
484             if existing_links:
485                 # get readable collections
486                 collections = api.collections().list(
487                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
488                     select=["uuid", "owner_uuid", "name", "manifest_text"]
489                     ).execute(num_retries=args.retries)['items']
490
491                 if collections:
492                     # check for repo+tag links on these collections
493                     if image_repo_tag:
494                         existing_repo_tag = _get_docker_links(
495                             api, args.retries,
496                             filters=[['link_class', '=', 'docker_image_repo+tag'],
497                                      ['name', '=', image_repo_tag],
498                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
499                     else:
500                         existing_repo_tag = []
501
502                     try:
503                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
504                     except StopIteration:
505                         # create new collection owned by the project
506                         coll_uuid = api.collections().create(
507                             body={"manifest_text": collections[0]['manifest_text'],
508                                   "name": collection_name,
509                                   "owner_uuid": parent_project_uuid,
510                                   "properties": {"docker-image-repo-tag": image_repo_tag}},
511                             ensure_unique_name=True
512                             ).execute(num_retries=args.retries)['uuid']
513
514                     link_base = {'owner_uuid': parent_project_uuid,
515                                  'head_uuid':  coll_uuid,
516                                  'properties': existing_links[0]['properties']}
517
518                     if not any(items_owned_by(parent_project_uuid, existing_links)):
519                         # create image link owned by the project
520                         make_link(api, args.retries,
521                                   'docker_image_hash', image_hash, **link_base)
522
523                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
524                         # create repo+tag link owned by the project
525                         make_link(api, args.retries, 'docker_image_repo+tag',
526                                   image_repo_tag, **link_base)
527
528                     stdout.write(coll_uuid + "\n")
529
530                     sys.exit(0)
531
532         # Open a file for the saved image, and write it if needed.
533         image_file, need_save = prep_image_file(outfile_name)
534         if need_save:
535             save_image(image_hash, image_file)
536
537         # Call arv-put with switches we inherited from it
538         # (a.k.a., switches that aren't our own).
539         if arguments is None:
540             arguments = sys.argv[1:]
541         arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
542         put_args = keepdocker_parser.parse_known_args(arguments)[1]
543
544         # Don't fail when cached manifest is invalid, just ignore the cache.
545         put_args += ['--batch']
546
547         if args.name is None:
548             put_args += ['--name', collection_name]
549
550         coll_uuid = arv_put.main(
551             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
552             install_sig_handlers=install_sig_handlers).strip()
553
554         # Managed properties could be already set
555         coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {})
556         coll_properties.update({"docker-image-repo-tag": image_repo_tag})
557         api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries)
558
559         _, image_metadata = load_image_metadata(image_file)
560         link_base = {'head_uuid': coll_uuid, 'properties': {}}
561         if 'created' in image_metadata:
562             link_base['properties']['image_timestamp'] = image_metadata['created']
563         if args.project_uuid is not None:
564             link_base['owner_uuid'] = args.project_uuid
565
566         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
567         if image_repo_tag:
568             make_link(api, args.retries,
569                       'docker_image_repo+tag', image_repo_tag, **link_base)
570
571         # Clean up.
572         image_file.close()
573         for filename in [stat_cache_name(image_file), image_file.name]:
574             try:
575                 os.unlink(filename)
576             except OSError as error:
577                 if error.errno != errno.ENOENT:
578                     raise
579     finally:
580         if lockfile is not None:
581             # Closing the lockfile unlocks it.
582             lockfile.close()
583
584 if __name__ == '__main__':
585     main()