21452: Clean imports in arvados.commands._util
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import collections
7 import datetime
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import subprocess
15 import sys
16 import tarfile
17 import tempfile
18
19 import ciso8601
20 from operator import itemgetter
21 from stat import *
22
23 import arvados
24 import arvados.config
25 import arvados.util
26 import arvados.commands._util as arv_cmd
27 import arvados.commands.put as arv_put
28 from arvados._version import __version__
29
30 logger = logging.getLogger('arvados.keepdocker')
31 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
32                 else logging.INFO)
33
34 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
35 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
36
37 DockerImage = collections.namedtuple(
38     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
39
40 keepdocker_parser = argparse.ArgumentParser(add_help=False)
41 keepdocker_parser.add_argument(
42     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
43     help='Print version and exit.')
44 keepdocker_parser.add_argument(
45     '-f', '--force', action='store_true', default=False,
46     help="Re-upload the image even if it already exists on the server")
47 keepdocker_parser.add_argument(
48     '--force-image-format', action='store_true', default=False,
49     help="Proceed even if the image format is not supported by the server")
50
51 _group = keepdocker_parser.add_mutually_exclusive_group()
52 _group.add_argument(
53     '--pull', action='store_true', default=False,
54     help="Try to pull the latest image from Docker registry")
55 _group.add_argument(
56     '--no-pull', action='store_false', dest='pull',
57     help="Use locally installed image only, don't pull image from Docker registry (default)")
58
59 # Combine keepdocker options listed above with run_opts options of arv-put.
60 # The options inherited from arv-put include --name, --project-uuid,
61 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
62 arg_parser = argparse.ArgumentParser(
63         description="Upload or list Docker images in Arvados",
64         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
65
66 arg_parser.add_argument(
67     'image', nargs='?',
68     help="Docker image to upload: repo, repo:tag, or hash")
69 arg_parser.add_argument(
70     'tag', nargs='?',
71     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
72
73 class DockerError(Exception):
74     pass
75
76
77 def popen_docker(cmd, *args, **kwargs):
78     manage_stdin = ('stdin' not in kwargs)
79     kwargs.setdefault('stdin', subprocess.PIPE)
80     kwargs.setdefault('stdout', subprocess.PIPE)
81     kwargs.setdefault('stderr', subprocess.PIPE)
82     try:
83         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
84     except OSError:  # No docker in $PATH, try docker.io
85         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
86     if manage_stdin:
87         docker_proc.stdin.close()
88     return docker_proc
89
90 def check_docker(proc, description):
91     proc.wait()
92     if proc.returncode != 0:
93         raise DockerError("docker {} returned status code {}".
94                           format(description, proc.returncode))
95
96 def docker_image_format(image_hash):
97     """Return the registry format ('v1' or 'v2') of the given image."""
98     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
99                         stdout=subprocess.PIPE)
100     try:
101         image_id = next(cmd.stdout).decode('utf-8').strip()
102         if image_id.startswith('sha256:'):
103             return 'v2'
104         elif ':' not in image_id:
105             return 'v1'
106         else:
107             return 'unknown'
108     finally:
109         check_docker(cmd, "inspect")
110
111 def docker_image_compatible(api, image_hash):
112     supported = api._rootDesc.get('dockerImageFormats', [])
113     if not supported:
114         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
115         return False
116
117     fmt = docker_image_format(image_hash)
118     if fmt in supported:
119         return True
120     else:
121         logger.error("image format is {!r} " \
122             "but server supports only {!r}".format(fmt, supported))
123         return False
124
125 def docker_images():
126     # Yield a DockerImage tuple for each installed image.
127     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
128     list_output = iter(list_proc.stdout)
129     next(list_output)  # Ignore the header line
130     for line in list_output:
131         words = line.split()
132         words = [word.decode('utf-8') for word in words]
133         size_index = len(words) - 2
134         repo, tag, imageid = words[:3]
135         ctime = ' '.join(words[3:size_index])
136         vsize = ' '.join(words[size_index:])
137         yield DockerImage(repo, tag, imageid, ctime, vsize)
138     list_proc.stdout.close()
139     check_docker(list_proc, "images")
140
141 def find_image_hashes(image_search, image_tag=None):
142     # Query for a Docker images with the repository and tag and return
143     # the image ids in a list.  Returns empty list if no match is
144     # found.
145
146     list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE)
147
148     inspect = list_proc.stdout.read()
149     list_proc.stdout.close()
150
151     imageinfo = json.loads(inspect)
152
153     return [i["Id"] for i in imageinfo]
154
155 def find_one_image_hash(image_search, image_tag=None):
156     hashes = find_image_hashes(image_search, image_tag)
157     hash_count = len(hashes)
158     if hash_count == 1:
159         return hashes.pop()
160     elif hash_count == 0:
161         raise DockerError("no matching image found")
162     else:
163         raise DockerError("{} images match {}".format(hash_count, image_search))
164
165 def stat_cache_name(image_file):
166     return getattr(image_file, 'name', image_file) + '.stat'
167
168 def pull_image(image_name, image_tag):
169     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
170                  "pull")
171
172 def save_image(image_hash, image_file):
173     # Save the specified Docker image to image_file, then try to save its
174     # stats so we can try to resume after interruption.
175     check_docker(popen_docker(['save', image_hash], stdout=image_file),
176                  "save")
177     image_file.flush()
178     try:
179         with open(stat_cache_name(image_file), 'w') as statfile:
180             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
181     except STAT_CACHE_ERRORS:
182         pass  # We won't resume from this cache.  No big deal.
183
184 def get_cache_dir():
185     return arv_cmd.make_home_conf_dir(
186         os.path.join('.cache', 'arvados', 'docker'), 0o700)
187
188 def prep_image_file(filename):
189     # Return a file object ready to save a Docker image,
190     # and a boolean indicating whether or not we need to actually save the
191     # image (False if a cached save is available).
192     cache_dir = get_cache_dir()
193     if cache_dir is None:
194         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
195         need_save = True
196     else:
197         file_path = os.path.join(cache_dir, filename)
198         try:
199             with open(stat_cache_name(file_path)) as statfile:
200                 prev_stat = json.load(statfile)
201             now_stat = os.stat(file_path)
202             need_save = any(prev_stat[field] != now_stat[field]
203                             for field in [ST_MTIME, ST_SIZE])
204         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
205             need_save = True  # We couldn't compare against old stats
206         image_file = open(file_path, 'w+b' if need_save else 'rb')
207     return image_file, need_save
208
209 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
210     link_attrs.update({'link_class': link_class, 'name': link_name})
211     return api_client.links().create(body=link_attrs).execute(
212         num_retries=num_retries)
213
214 def docker_link_sort_key(link):
215     """Build a sort key to find the latest available Docker image.
216
217     To find one source collection for a Docker image referenced by
218     name or image id, the API server looks for a link with the most
219     recent `image_timestamp` property; then the most recent
220     `created_at` timestamp.  This method generates a sort key for
221     Docker metadata links to sort them from least to most preferred.
222     """
223     try:
224         image_timestamp = ciso8601.parse_datetime_as_naive(
225             link['properties']['image_timestamp'])
226     except (KeyError, ValueError):
227         image_timestamp = EARLIEST_DATETIME
228     try:
229         created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
230     except ValueError:
231         created_timestamp = None
232     return (image_timestamp, created_timestamp)
233
234 def _get_docker_links(api_client, num_retries, **kwargs):
235     links = list(arvados.util.keyset_list_all(
236         api_client.links().list, num_retries=num_retries, **kwargs,
237     ))
238     for link in links:
239         link['_sort_key'] = docker_link_sort_key(link)
240     links.sort(key=itemgetter('_sort_key'), reverse=True)
241     return links
242
243 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
244     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
245     return {
246         '_sort_key': link['_sort_key'],
247         'timestamp': link['_sort_key'][timestamp_index],
248         'collection': link['head_uuid'],
249         'dockerhash': dockerhash,
250         'repo': repo,
251         'tag': tag,
252         }
253
254 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
255     """List all Docker images known to the api_client with image_name and
256     image_tag.  If no image_name is given, defaults to listing all
257     Docker images.
258
259     Returns a list of tuples representing matching Docker images,
260     sorted in preference order (i.e. the first collection in the list
261     is the one that the API server would use). Each tuple is a
262     (collection_uuid, collection_info) pair, where collection_info is
263     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
264
265     """
266     search_filters = []
267     repo_links = None
268     hash_links = None
269
270     project_filter = []
271     if project_uuid is not None:
272         project_filter = [["owner_uuid", "=", project_uuid]]
273
274     if image_name:
275         # Find images with the name the user specified.
276         search_links = _get_docker_links(
277             api_client, num_retries,
278             filters=[['link_class', '=', 'docker_image_repo+tag'],
279                      ['name', '=',
280                       '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
281         if search_links:
282             repo_links = search_links
283         else:
284             # Fall back to finding images with the specified image hash.
285             search_links = _get_docker_links(
286                 api_client, num_retries,
287                 filters=[['link_class', '=', 'docker_image_hash'],
288                          ['name', 'ilike', image_name + '%']]+project_filter)
289             hash_links = search_links
290         # Only list information about images that were found in the search.
291         search_filters.append(['head_uuid', 'in',
292                                [link['head_uuid'] for link in search_links]])
293
294     # It should be reasonable to expect that each collection only has one
295     # image hash (though there may be many links specifying this).  Find
296     # the API server's most preferred image hash link for each collection.
297     if hash_links is None:
298         hash_links = _get_docker_links(
299             api_client, num_retries,
300             filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
301     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
302
303     # Each collection may have more than one name (though again, one name
304     # may be specified more than once).  Build an image listing from name
305     # tags, sorted by API server preference.
306     if repo_links is None:
307         repo_links = _get_docker_links(
308             api_client, num_retries,
309             filters=search_filters + [['link_class', '=',
310                                        'docker_image_repo+tag']]+project_filter)
311     seen_image_names = collections.defaultdict(set)
312     images = []
313     for link in repo_links:
314         collection_uuid = link['head_uuid']
315         if link['name'] in seen_image_names[collection_uuid]:
316             continue
317         seen_image_names[collection_uuid].add(link['name'])
318         try:
319             dockerhash = hash_link_map[collection_uuid]['name']
320         except KeyError:
321             dockerhash = '<unknown>'
322         name_parts = link['name'].rsplit(':', 1)
323         images.append(_new_image_listing(link, dockerhash, *name_parts))
324
325     # Find any image hash links that did not have a corresponding name link,
326     # and add image listings for them, retaining the API server preference
327     # sorting.
328     images_start_size = len(images)
329     for collection_uuid, link in hash_link_map.items():
330         if not seen_image_names[collection_uuid]:
331             images.append(_new_image_listing(link, link['name']))
332     if len(images) > images_start_size:
333         images.sort(key=itemgetter('_sort_key'), reverse=True)
334
335     # Remove any image listings that refer to unknown collections.
336     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all(
337         api_client.collections().list,
338         num_retries=num_retries,
339         filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
340         select=['uuid'],
341     )}
342     return [(image['collection'], image) for image in images
343             if image['collection'] in existing_coll_uuids]
344
345 def items_owned_by(owner_uuid, arv_items):
346     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
347
348 def _uuid2pdh(api, uuid):
349     return api.collections().list(
350         filters=[['uuid', '=', uuid]],
351         select=['portable_data_hash'],
352     ).execute()['items'][0]['portable_data_hash']
353
354 def load_image_metadata(image_file):
355     """Load an image manifest and config from an archive
356
357     Given an image archive as an open binary file object, this function loads
358     the image manifest and configuration, deserializing each from JSON and
359     returning them in a 2-tuple of dicts.
360     """
361     image_file.seek(0)
362     with tarfile.open(fileobj=image_file) as image_tar:
363         with image_tar.extractfile('manifest.json') as manifest_file:
364             image_manifest_list = json.load(manifest_file)
365         # Because arv-keepdocker only saves one image, there should only be
366         # one manifest.  This extracts that from the list and raises
367         # ValueError if there's not exactly one.
368         image_manifest, = image_manifest_list
369         with image_tar.extractfile(image_manifest['Config']) as config_file:
370             image_config = json.load(config_file)
371     return image_manifest, image_config
372
373 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
374     args = arg_parser.parse_args(arguments)
375     if api is None:
376         api = arvados.api('v1', num_retries=args.retries)
377
378     if args.image is None or args.image == 'images':
379         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
380         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
381         try:
382             for i, j in list_images_in_arv(api, args.retries):
383                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
384         except IOError as e:
385             if e.errno == errno.EPIPE:
386                 pass
387             else:
388                 raise
389         sys.exit(0)
390
391     if re.search(r':\w[-.\w]{0,127}$', args.image):
392         # image ends with :valid-tag
393         if args.tag is not None:
394             logger.error(
395                 "image %r already includes a tag, cannot add tag argument %r",
396                 args.image, args.tag)
397             sys.exit(1)
398         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
399         args.image, args.tag = args.image.rsplit(':', 1)
400     elif args.tag is None:
401         args.tag = 'latest'
402
403     if '/' in args.image:
404         hostport, path = args.image.split('/', 1)
405         if hostport.endswith(':443'):
406             # "docker pull host:443/asdf" transparently removes the
407             # :443 (which is redundant because https is implied) and
408             # after it succeeds "docker images" will list "host/asdf",
409             # not "host:443/asdf".  If we strip the :443 then the name
410             # doesn't change underneath us.
411             args.image = '/'.join([hostport[:-4], path])
412
413     # Pull the image if requested, unless the image is specified as a hash
414     # that we already have.
415     if args.pull and not find_image_hashes(args.image):
416         pull_image(args.image, args.tag)
417
418     images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
419
420     image_hash = None
421     try:
422         image_hash = find_one_image_hash(args.image, args.tag)
423         if not docker_image_compatible(api, image_hash):
424             if args.force_image_format:
425                 logger.warning("forcing incompatible image")
426             else:
427                 logger.error("refusing to store " \
428                     "incompatible format (use --force-image-format to override)")
429                 sys.exit(1)
430     except DockerError as error:
431         if images_in_arv:
432             # We don't have Docker / we don't have the image locally,
433             # use image that's already uploaded to Arvados
434             image_hash = images_in_arv[0][1]['dockerhash']
435         else:
436             logger.error(str(error))
437             sys.exit(1)
438
439     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
440
441     if args.name is None:
442         if image_repo_tag:
443             collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12])
444         else:
445             collection_name = 'Docker image {}'.format(image_hash[0:12])
446     else:
447         collection_name = args.name
448
449     # Acquire a lock so that only one arv-keepdocker process will
450     # dump/upload a particular docker image at a time.  Do this before
451     # checking if the image already exists in Arvados so that if there
452     # is an upload already underway, when that upload completes and
453     # this process gets a turn, it will discover the Docker image is
454     # already available and exit quickly.
455     outfile_name = '{}.tar'.format(image_hash)
456     lockfile_name = '{}.lock'.format(outfile_name)
457     lockfile = None
458     cache_dir = get_cache_dir()
459     if cache_dir:
460         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
461         fcntl.flock(lockfile, fcntl.LOCK_EX)
462
463     try:
464         if not args.force:
465             # Check if this image is already in Arvados.
466
467             # Project where everything should be owned
468             parent_project_uuid = args.project_uuid or api.users().current().execute(
469                 num_retries=args.retries)['uuid']
470
471             # Find image hash tags
472             existing_links = _get_docker_links(
473                 api, args.retries,
474                 filters=[['link_class', '=', 'docker_image_hash'],
475                          ['name', '=', image_hash]])
476             if existing_links:
477                 # get readable collections
478                 collections = api.collections().list(
479                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
480                     select=["uuid", "owner_uuid", "name", "manifest_text"]
481                     ).execute(num_retries=args.retries)['items']
482
483                 if collections:
484                     # check for repo+tag links on these collections
485                     if image_repo_tag:
486                         existing_repo_tag = _get_docker_links(
487                             api, args.retries,
488                             filters=[['link_class', '=', 'docker_image_repo+tag'],
489                                      ['name', '=', image_repo_tag],
490                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
491                     else:
492                         existing_repo_tag = []
493
494                     try:
495                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
496                     except StopIteration:
497                         # create new collection owned by the project
498                         coll_uuid = api.collections().create(
499                             body={"manifest_text": collections[0]['manifest_text'],
500                                   "name": collection_name,
501                                   "owner_uuid": parent_project_uuid,
502                                   "properties": {"docker-image-repo-tag": image_repo_tag}},
503                             ensure_unique_name=True
504                             ).execute(num_retries=args.retries)['uuid']
505
506                     link_base = {'owner_uuid': parent_project_uuid,
507                                  'head_uuid':  coll_uuid,
508                                  'properties': existing_links[0]['properties']}
509
510                     if not any(items_owned_by(parent_project_uuid, existing_links)):
511                         # create image link owned by the project
512                         make_link(api, args.retries,
513                                   'docker_image_hash', image_hash, **link_base)
514
515                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
516                         # create repo+tag link owned by the project
517                         make_link(api, args.retries, 'docker_image_repo+tag',
518                                   image_repo_tag, **link_base)
519
520                     stdout.write(coll_uuid + "\n")
521
522                     sys.exit(0)
523
524         # Open a file for the saved image, and write it if needed.
525         image_file, need_save = prep_image_file(outfile_name)
526         if need_save:
527             save_image(image_hash, image_file)
528
529         # Call arv-put with switches we inherited from it
530         # (a.k.a., switches that aren't our own).
531         if arguments is None:
532             arguments = sys.argv[1:]
533         arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
534         put_args = keepdocker_parser.parse_known_args(arguments)[1]
535
536         # Don't fail when cached manifest is invalid, just ignore the cache.
537         put_args += ['--batch']
538
539         if args.name is None:
540             put_args += ['--name', collection_name]
541
542         coll_uuid = arv_put.main(
543             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
544             install_sig_handlers=install_sig_handlers).strip()
545
546         # Managed properties could be already set
547         coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {})
548         coll_properties.update({"docker-image-repo-tag": image_repo_tag})
549         api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries)
550
551         _, image_metadata = load_image_metadata(image_file)
552         link_base = {'head_uuid': coll_uuid, 'properties': {}}
553         if 'created' in image_metadata:
554             link_base['properties']['image_timestamp'] = image_metadata['created']
555         if args.project_uuid is not None:
556             link_base['owner_uuid'] = args.project_uuid
557
558         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
559         if image_repo_tag:
560             make_link(api, args.retries,
561                       'docker_image_repo+tag', image_repo_tag, **link_base)
562
563         # Clean up.
564         image_file.close()
565         for filename in [stat_cache_name(image_file), image_file.name]:
566             try:
567                 os.unlink(filename)
568             except OSError as error:
569                 if error.errno != errno.ENOENT:
570                     raise
571     finally:
572         if lockfile is not None:
573             # Closing the lockfile unlocks it.
574             lockfile.close()
575
576 if __name__ == '__main__':
577     main()