21128: Merge commit 'adbbc9e3c7a36d39b30f403555ee5889e32adcc0' into 21128-toolbar...
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import sys
14 import tarfile
15 import tempfile
16 import shutil
17 import _strptime
18 import fcntl
19 from operator import itemgetter
20 from stat import *
21
22 import subprocess
23
24 import arvados
25 import arvados.util
26 import arvados.commands._util as arv_cmd
27 import arvados.commands.put as arv_put
28 from arvados.collection import CollectionReader
29 import ciso8601
30 import logging
31 import arvados.config
32
33 from arvados._version import __version__
34
35 logger = logging.getLogger('arvados.keepdocker')
36 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
37                 else logging.INFO)
38
39 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
40 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
41
42 DockerImage = collections.namedtuple(
43     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
44
45 keepdocker_parser = argparse.ArgumentParser(add_help=False)
46 keepdocker_parser.add_argument(
47     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
48     help='Print version and exit.')
49 keepdocker_parser.add_argument(
50     '-f', '--force', action='store_true', default=False,
51     help="Re-upload the image even if it already exists on the server")
52 keepdocker_parser.add_argument(
53     '--force-image-format', action='store_true', default=False,
54     help="Proceed even if the image format is not supported by the server")
55
56 _group = keepdocker_parser.add_mutually_exclusive_group()
57 _group.add_argument(
58     '--pull', action='store_true', default=False,
59     help="Try to pull the latest image from Docker registry")
60 _group.add_argument(
61     '--no-pull', action='store_false', dest='pull',
62     help="Use locally installed image only, don't pull image from Docker registry (default)")
63
64 # Combine keepdocker options listed above with run_opts options of arv-put.
65 # The options inherited from arv-put include --name, --project-uuid,
66 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
67 arg_parser = argparse.ArgumentParser(
68         description="Upload or list Docker images in Arvados",
69         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
70
71 arg_parser.add_argument(
72     'image', nargs='?',
73     help="Docker image to upload: repo, repo:tag, or hash")
74 arg_parser.add_argument(
75     'tag', nargs='?',
76     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
77
78 class DockerError(Exception):
79     pass
80
81
82 def popen_docker(cmd, *args, **kwargs):
83     manage_stdin = ('stdin' not in kwargs)
84     kwargs.setdefault('stdin', subprocess.PIPE)
85     kwargs.setdefault('stdout', subprocess.PIPE)
86     kwargs.setdefault('stderr', subprocess.PIPE)
87     try:
88         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
89     except OSError:  # No docker in $PATH, try docker.io
90         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
91     if manage_stdin:
92         docker_proc.stdin.close()
93     return docker_proc
94
95 def check_docker(proc, description):
96     proc.wait()
97     if proc.returncode != 0:
98         raise DockerError("docker {} returned status code {}".
99                           format(description, proc.returncode))
100
101 def docker_image_format(image_hash):
102     """Return the registry format ('v1' or 'v2') of the given image."""
103     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
104                         stdout=subprocess.PIPE)
105     try:
106         image_id = next(cmd.stdout).decode('utf-8').strip()
107         if image_id.startswith('sha256:'):
108             return 'v2'
109         elif ':' not in image_id:
110             return 'v1'
111         else:
112             return 'unknown'
113     finally:
114         check_docker(cmd, "inspect")
115
116 def docker_image_compatible(api, image_hash):
117     supported = api._rootDesc.get('dockerImageFormats', [])
118     if not supported:
119         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
120         return False
121
122     fmt = docker_image_format(image_hash)
123     if fmt in supported:
124         return True
125     else:
126         logger.error("image format is {!r} " \
127             "but server supports only {!r}".format(fmt, supported))
128         return False
129
130 def docker_images():
131     # Yield a DockerImage tuple for each installed image.
132     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
133     list_output = iter(list_proc.stdout)
134     next(list_output)  # Ignore the header line
135     for line in list_output:
136         words = line.split()
137         words = [word.decode('utf-8') for word in words]
138         size_index = len(words) - 2
139         repo, tag, imageid = words[:3]
140         ctime = ' '.join(words[3:size_index])
141         vsize = ' '.join(words[size_index:])
142         yield DockerImage(repo, tag, imageid, ctime, vsize)
143     list_proc.stdout.close()
144     check_docker(list_proc, "images")
145
146 def find_image_hashes(image_search, image_tag=None):
147     # Query for a Docker images with the repository and tag and return
148     # the image ids in a list.  Returns empty list if no match is
149     # found.
150
151     list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE)
152
153     inspect = list_proc.stdout.read()
154     list_proc.stdout.close()
155
156     imageinfo = json.loads(inspect)
157
158     return [i["Id"] for i in imageinfo]
159
160 def find_one_image_hash(image_search, image_tag=None):
161     hashes = find_image_hashes(image_search, image_tag)
162     hash_count = len(hashes)
163     if hash_count == 1:
164         return hashes.pop()
165     elif hash_count == 0:
166         raise DockerError("no matching image found")
167     else:
168         raise DockerError("{} images match {}".format(hash_count, image_search))
169
170 def stat_cache_name(image_file):
171     return getattr(image_file, 'name', image_file) + '.stat'
172
173 def pull_image(image_name, image_tag):
174     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
175                  "pull")
176
177 def save_image(image_hash, image_file):
178     # Save the specified Docker image to image_file, then try to save its
179     # stats so we can try to resume after interruption.
180     check_docker(popen_docker(['save', image_hash], stdout=image_file),
181                  "save")
182     image_file.flush()
183     try:
184         with open(stat_cache_name(image_file), 'w') as statfile:
185             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
186     except STAT_CACHE_ERRORS:
187         pass  # We won't resume from this cache.  No big deal.
188
189 def get_cache_dir():
190     return arv_cmd.make_home_conf_dir(
191         os.path.join('.cache', 'arvados', 'docker'), 0o700)
192
193 def prep_image_file(filename):
194     # Return a file object ready to save a Docker image,
195     # and a boolean indicating whether or not we need to actually save the
196     # image (False if a cached save is available).
197     cache_dir = get_cache_dir()
198     if cache_dir is None:
199         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
200         need_save = True
201     else:
202         file_path = os.path.join(cache_dir, filename)
203         try:
204             with open(stat_cache_name(file_path)) as statfile:
205                 prev_stat = json.load(statfile)
206             now_stat = os.stat(file_path)
207             need_save = any(prev_stat[field] != now_stat[field]
208                             for field in [ST_MTIME, ST_SIZE])
209         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
210             need_save = True  # We couldn't compare against old stats
211         image_file = open(file_path, 'w+b' if need_save else 'rb')
212     return image_file, need_save
213
214 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
215     link_attrs.update({'link_class': link_class, 'name': link_name})
216     return api_client.links().create(body=link_attrs).execute(
217         num_retries=num_retries)
218
219 def docker_link_sort_key(link):
220     """Build a sort key to find the latest available Docker image.
221
222     To find one source collection for a Docker image referenced by
223     name or image id, the API server looks for a link with the most
224     recent `image_timestamp` property; then the most recent
225     `created_at` timestamp.  This method generates a sort key for
226     Docker metadata links to sort them from least to most preferred.
227     """
228     try:
229         image_timestamp = ciso8601.parse_datetime_as_naive(
230             link['properties']['image_timestamp'])
231     except (KeyError, ValueError):
232         image_timestamp = EARLIEST_DATETIME
233     try:
234         created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
235     except ValueError:
236         created_timestamp = None
237     return (image_timestamp, created_timestamp)
238
239 def _get_docker_links(api_client, num_retries, **kwargs):
240     links = list(arvados.util.keyset_list_all(
241         api_client.links().list, num_retries=num_retries, **kwargs,
242     ))
243     for link in links:
244         link['_sort_key'] = docker_link_sort_key(link)
245     links.sort(key=itemgetter('_sort_key'), reverse=True)
246     return links
247
248 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
249     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
250     return {
251         '_sort_key': link['_sort_key'],
252         'timestamp': link['_sort_key'][timestamp_index],
253         'collection': link['head_uuid'],
254         'dockerhash': dockerhash,
255         'repo': repo,
256         'tag': tag,
257         }
258
259 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
260     """List all Docker images known to the api_client with image_name and
261     image_tag.  If no image_name is given, defaults to listing all
262     Docker images.
263
264     Returns a list of tuples representing matching Docker images,
265     sorted in preference order (i.e. the first collection in the list
266     is the one that the API server would use). Each tuple is a
267     (collection_uuid, collection_info) pair, where collection_info is
268     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
269
270     """
271     search_filters = []
272     repo_links = None
273     hash_links = None
274
275     project_filter = []
276     if project_uuid is not None:
277         project_filter = [["owner_uuid", "=", project_uuid]]
278
279     if image_name:
280         # Find images with the name the user specified.
281         search_links = _get_docker_links(
282             api_client, num_retries,
283             filters=[['link_class', '=', 'docker_image_repo+tag'],
284                      ['name', '=',
285                       '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
286         if search_links:
287             repo_links = search_links
288         else:
289             # Fall back to finding images with the specified image hash.
290             search_links = _get_docker_links(
291                 api_client, num_retries,
292                 filters=[['link_class', '=', 'docker_image_hash'],
293                          ['name', 'ilike', image_name + '%']]+project_filter)
294             hash_links = search_links
295         # Only list information about images that were found in the search.
296         search_filters.append(['head_uuid', 'in',
297                                [link['head_uuid'] for link in search_links]])
298
299     # It should be reasonable to expect that each collection only has one
300     # image hash (though there may be many links specifying this).  Find
301     # the API server's most preferred image hash link for each collection.
302     if hash_links is None:
303         hash_links = _get_docker_links(
304             api_client, num_retries,
305             filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
306     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
307
308     # Each collection may have more than one name (though again, one name
309     # may be specified more than once).  Build an image listing from name
310     # tags, sorted by API server preference.
311     if repo_links is None:
312         repo_links = _get_docker_links(
313             api_client, num_retries,
314             filters=search_filters + [['link_class', '=',
315                                        'docker_image_repo+tag']]+project_filter)
316     seen_image_names = collections.defaultdict(set)
317     images = []
318     for link in repo_links:
319         collection_uuid = link['head_uuid']
320         if link['name'] in seen_image_names[collection_uuid]:
321             continue
322         seen_image_names[collection_uuid].add(link['name'])
323         try:
324             dockerhash = hash_link_map[collection_uuid]['name']
325         except KeyError:
326             dockerhash = '<unknown>'
327         name_parts = link['name'].rsplit(':', 1)
328         images.append(_new_image_listing(link, dockerhash, *name_parts))
329
330     # Find any image hash links that did not have a corresponding name link,
331     # and add image listings for them, retaining the API server preference
332     # sorting.
333     images_start_size = len(images)
334     for collection_uuid, link in hash_link_map.items():
335         if not seen_image_names[collection_uuid]:
336             images.append(_new_image_listing(link, link['name']))
337     if len(images) > images_start_size:
338         images.sort(key=itemgetter('_sort_key'), reverse=True)
339
340     # Remove any image listings that refer to unknown collections.
341     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all(
342         api_client.collections().list,
343         num_retries=num_retries,
344         filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
345         select=['uuid'],
346     )}
347     return [(image['collection'], image) for image in images
348             if image['collection'] in existing_coll_uuids]
349
350 def items_owned_by(owner_uuid, arv_items):
351     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
352
353 def _uuid2pdh(api, uuid):
354     return api.collections().list(
355         filters=[['uuid', '=', uuid]],
356         select=['portable_data_hash'],
357     ).execute()['items'][0]['portable_data_hash']
358
359 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
360     args = arg_parser.parse_args(arguments)
361     if api is None:
362         api = arvados.api('v1', num_retries=args.retries)
363
364     if args.image is None or args.image == 'images':
365         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
366         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
367         try:
368             for i, j in list_images_in_arv(api, args.retries):
369                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
370         except IOError as e:
371             if e.errno == errno.EPIPE:
372                 pass
373             else:
374                 raise
375         sys.exit(0)
376
377     if re.search(r':\w[-.\w]{0,127}$', args.image):
378         # image ends with :valid-tag
379         if args.tag is not None:
380             logger.error(
381                 "image %r already includes a tag, cannot add tag argument %r",
382                 args.image, args.tag)
383             sys.exit(1)
384         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
385         args.image, args.tag = args.image.rsplit(':', 1)
386     elif args.tag is None:
387         args.tag = 'latest'
388
389     if '/' in args.image:
390         hostport, path = args.image.split('/', 1)
391         if hostport.endswith(':443'):
392             # "docker pull host:443/asdf" transparently removes the
393             # :443 (which is redundant because https is implied) and
394             # after it succeeds "docker images" will list "host/asdf",
395             # not "host:443/asdf".  If we strip the :443 then the name
396             # doesn't change underneath us.
397             args.image = '/'.join([hostport[:-4], path])
398
399     # Pull the image if requested, unless the image is specified as a hash
400     # that we already have.
401     if args.pull and not find_image_hashes(args.image):
402         pull_image(args.image, args.tag)
403
404     images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
405
406     image_hash = None
407     try:
408         image_hash = find_one_image_hash(args.image, args.tag)
409         if not docker_image_compatible(api, image_hash):
410             if args.force_image_format:
411                 logger.warning("forcing incompatible image")
412             else:
413                 logger.error("refusing to store " \
414                     "incompatible format (use --force-image-format to override)")
415                 sys.exit(1)
416     except DockerError as error:
417         if images_in_arv:
418             # We don't have Docker / we don't have the image locally,
419             # use image that's already uploaded to Arvados
420             image_hash = images_in_arv[0][1]['dockerhash']
421         else:
422             logger.error(str(error))
423             sys.exit(1)
424
425     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
426
427     if args.name is None:
428         if image_repo_tag:
429             collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12])
430         else:
431             collection_name = 'Docker image {}'.format(image_hash[0:12])
432     else:
433         collection_name = args.name
434
435     # Acquire a lock so that only one arv-keepdocker process will
436     # dump/upload a particular docker image at a time.  Do this before
437     # checking if the image already exists in Arvados so that if there
438     # is an upload already underway, when that upload completes and
439     # this process gets a turn, it will discover the Docker image is
440     # already available and exit quickly.
441     outfile_name = '{}.tar'.format(image_hash)
442     lockfile_name = '{}.lock'.format(outfile_name)
443     lockfile = None
444     cache_dir = get_cache_dir()
445     if cache_dir:
446         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
447         fcntl.flock(lockfile, fcntl.LOCK_EX)
448
449     try:
450         if not args.force:
451             # Check if this image is already in Arvados.
452
453             # Project where everything should be owned
454             parent_project_uuid = args.project_uuid or api.users().current().execute(
455                 num_retries=args.retries)['uuid']
456
457             # Find image hash tags
458             existing_links = _get_docker_links(
459                 api, args.retries,
460                 filters=[['link_class', '=', 'docker_image_hash'],
461                          ['name', '=', image_hash]])
462             if existing_links:
463                 # get readable collections
464                 collections = api.collections().list(
465                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
466                     select=["uuid", "owner_uuid", "name", "manifest_text"]
467                     ).execute(num_retries=args.retries)['items']
468
469                 if collections:
470                     # check for repo+tag links on these collections
471                     if image_repo_tag:
472                         existing_repo_tag = _get_docker_links(
473                             api, args.retries,
474                             filters=[['link_class', '=', 'docker_image_repo+tag'],
475                                      ['name', '=', image_repo_tag],
476                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
477                     else:
478                         existing_repo_tag = []
479
480                     try:
481                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
482                     except StopIteration:
483                         # create new collection owned by the project
484                         coll_uuid = api.collections().create(
485                             body={"manifest_text": collections[0]['manifest_text'],
486                                   "name": collection_name,
487                                   "owner_uuid": parent_project_uuid,
488                                   "properties": {"docker-image-repo-tag": image_repo_tag}},
489                             ensure_unique_name=True
490                             ).execute(num_retries=args.retries)['uuid']
491
492                     link_base = {'owner_uuid': parent_project_uuid,
493                                  'head_uuid':  coll_uuid,
494                                  'properties': existing_links[0]['properties']}
495
496                     if not any(items_owned_by(parent_project_uuid, existing_links)):
497                         # create image link owned by the project
498                         make_link(api, args.retries,
499                                   'docker_image_hash', image_hash, **link_base)
500
501                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
502                         # create repo+tag link owned by the project
503                         make_link(api, args.retries, 'docker_image_repo+tag',
504                                   image_repo_tag, **link_base)
505
506                     stdout.write(coll_uuid + "\n")
507
508                     sys.exit(0)
509
510         # Open a file for the saved image, and write it if needed.
511         image_file, need_save = prep_image_file(outfile_name)
512         if need_save:
513             save_image(image_hash, image_file)
514
515         # Call arv-put with switches we inherited from it
516         # (a.k.a., switches that aren't our own).
517         if arguments is None:
518             arguments = sys.argv[1:]
519         arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
520         put_args = keepdocker_parser.parse_known_args(arguments)[1]
521
522         # Don't fail when cached manifest is invalid, just ignore the cache.
523         put_args += ['--batch']
524
525         if args.name is None:
526             put_args += ['--name', collection_name]
527
528         coll_uuid = arv_put.main(
529             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
530             install_sig_handlers=install_sig_handlers).strip()
531
532         # Managed properties could be already set
533         coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {})
534         coll_properties.update({"docker-image-repo-tag": image_repo_tag})
535
536         api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries)
537
538         # Read the image metadata and make Arvados links from it.
539         image_file.seek(0)
540         image_tar = tarfile.open(fileobj=image_file)
541         image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
542         if image_hash_type:
543             json_filename = raw_image_hash + '.json'
544         else:
545             json_filename = raw_image_hash + '/json'
546         json_file = image_tar.extractfile(image_tar.getmember(json_filename))
547         image_metadata = json.loads(json_file.read().decode('utf-8'))
548         json_file.close()
549         image_tar.close()
550         link_base = {'head_uuid': coll_uuid, 'properties': {}}
551         if 'created' in image_metadata:
552             link_base['properties']['image_timestamp'] = image_metadata['created']
553         if args.project_uuid is not None:
554             link_base['owner_uuid'] = args.project_uuid
555
556         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
557         if image_repo_tag:
558             make_link(api, args.retries,
559                       'docker_image_repo+tag', image_repo_tag, **link_base)
560
561         # Clean up.
562         image_file.close()
563         for filename in [stat_cache_name(image_file), image_file.name]:
564             try:
565                 os.unlink(filename)
566             except OSError as error:
567                 if error.errno != errno.ENOENT:
568                     raise
569     finally:
570         if lockfile is not None:
571             # Closing the lockfile unlocks it.
572             lockfile.close()
573
574 if __name__ == '__main__':
575     main()