811f096c60808b2033c8123d5330909ee88db816
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import subprocess32 as subprocess
14 import sys
15 import tarfile
16 import tempfile
17 import shutil
18 import _strptime
19 import fcntl
20
21 from operator import itemgetter
22 from stat import *
23
24 import arvados
25 import arvados.util
26 import arvados.commands._util as arv_cmd
27 import arvados.commands.put as arv_put
28 from arvados.collection import CollectionReader
29 import ciso8601
30 import logging
31 import arvados.config
32
33 from arvados._version import __version__
34
35 logger = logging.getLogger('arvados.keepdocker')
36 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
37                 else logging.INFO)
38
39 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
40 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
41
42 DockerImage = collections.namedtuple(
43     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
44
45 keepdocker_parser = argparse.ArgumentParser(add_help=False)
46 keepdocker_parser.add_argument(
47     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
48     help='Print version and exit.')
49 keepdocker_parser.add_argument(
50     '-f', '--force', action='store_true', default=False,
51     help="Re-upload the image even if it already exists on the server")
52 keepdocker_parser.add_argument(
53     '--force-image-format', action='store_true', default=False,
54     help="Proceed even if the image format is not supported by the server")
55
56 _group = keepdocker_parser.add_mutually_exclusive_group()
57 _group.add_argument(
58     '--pull', action='store_true', default=False,
59     help="Try to pull the latest image from Docker registry")
60 _group.add_argument(
61     '--no-pull', action='store_false', dest='pull',
62     help="Use locally installed image only, don't pull image from Docker registry (default)")
63
64 keepdocker_parser.add_argument(
65     'image', nargs='?',
66     help="Docker image to upload: repo, repo:tag, or hash")
67 keepdocker_parser.add_argument(
68     'tag', nargs='?',
69     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
70
71 # Combine keepdocker options listed above with run_opts options of arv-put.
72 # The options inherited from arv-put include --name, --project-uuid,
73 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
74 arg_parser = argparse.ArgumentParser(
75         description="Upload or list Docker images in Arvados",
76         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
77
78 class DockerError(Exception):
79     pass
80
81
82 def popen_docker(cmd, *args, **kwargs):
83     manage_stdin = ('stdin' not in kwargs)
84     kwargs.setdefault('stdin', subprocess.PIPE)
85     kwargs.setdefault('stdout', sys.stderr)
86     try:
87         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
88     except OSError:  # No docker.io in $PATH
89         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
90     if manage_stdin:
91         docker_proc.stdin.close()
92     return docker_proc
93
94 def check_docker(proc, description):
95     proc.wait()
96     if proc.returncode != 0:
97         raise DockerError("docker {} returned status code {}".
98                           format(description, proc.returncode))
99
100 def docker_image_format(image_hash):
101     """Return the registry format ('v1' or 'v2') of the given image."""
102     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
103                         stdout=subprocess.PIPE)
104     try:
105         image_id = next(cmd.stdout).decode().strip()
106         if image_id.startswith('sha256:'):
107             return 'v2'
108         elif ':' not in image_id:
109             return 'v1'
110         else:
111             return 'unknown'
112     finally:
113         check_docker(cmd, "inspect")
114
115 def docker_image_compatible(api, image_hash):
116     supported = api._rootDesc.get('dockerImageFormats', [])
117     if not supported:
118         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
119         return False
120
121     fmt = docker_image_format(image_hash)
122     if fmt in supported:
123         return True
124     else:
125         logger.error("image format is {!r} " \
126             "but server supports only {!r}".format(fmt, supported))
127         return False
128
129 def docker_images():
130     # Yield a DockerImage tuple for each installed image.
131     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
132     list_output = iter(list_proc.stdout)
133     next(list_output)  # Ignore the header line
134     for line in list_output:
135         words = line.split()
136         words = [word.decode() for word in words]
137         size_index = len(words) - 2
138         repo, tag, imageid = words[:3]
139         ctime = ' '.join(words[3:size_index])
140         vsize = ' '.join(words[size_index:])
141         yield DockerImage(repo, tag, imageid, ctime, vsize)
142     list_proc.stdout.close()
143     check_docker(list_proc, "images")
144
145 def find_image_hashes(image_search, image_tag=None):
146     # Given one argument, search for Docker images with matching hashes,
147     # and return their full hashes in a set.
148     # Given two arguments, also search for a Docker image with the
149     # same repository and tag.  If one is found, return its hash in a
150     # set; otherwise, fall back to the one-argument hash search.
151     # Returns None if no match is found, or a hash search is ambiguous.
152     hash_search = image_search.lower()
153     hash_matches = set()
154     for image in docker_images():
155         if (image.repo == image_search) and (image.tag == image_tag):
156             return set([image.hash])
157         elif image.hash.startswith(hash_search):
158             hash_matches.add(image.hash)
159     return hash_matches
160
161 def find_one_image_hash(image_search, image_tag=None):
162     hashes = find_image_hashes(image_search, image_tag)
163     hash_count = len(hashes)
164     if hash_count == 1:
165         return hashes.pop()
166     elif hash_count == 0:
167         raise DockerError("no matching image found")
168     else:
169         raise DockerError("{} images match {}".format(hash_count, image_search))
170
171 def stat_cache_name(image_file):
172     return getattr(image_file, 'name', image_file) + '.stat'
173
174 def pull_image(image_name, image_tag):
175     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
176                  "pull")
177
178 def save_image(image_hash, image_file):
179     # Save the specified Docker image to image_file, then try to save its
180     # stats so we can try to resume after interruption.
181     check_docker(popen_docker(['save', image_hash], stdout=image_file),
182                  "save")
183     image_file.flush()
184     try:
185         with open(stat_cache_name(image_file), 'w') as statfile:
186             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
187     except STAT_CACHE_ERRORS:
188         pass  # We won't resume from this cache.  No big deal.
189
190 def get_cache_dir():
191     return arv_cmd.make_home_conf_dir(
192         os.path.join('.cache', 'arvados', 'docker'), 0o700)
193
194 def prep_image_file(filename):
195     # Return a file object ready to save a Docker image,
196     # and a boolean indicating whether or not we need to actually save the
197     # image (False if a cached save is available).
198     cache_dir = get_cache_dir()
199     if cache_dir is None:
200         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
201         need_save = True
202     else:
203         file_path = os.path.join(cache_dir, filename)
204         try:
205             with open(stat_cache_name(file_path)) as statfile:
206                 prev_stat = json.load(statfile)
207             now_stat = os.stat(file_path)
208             need_save = any(prev_stat[field] != now_stat[field]
209                             for field in [ST_MTIME, ST_SIZE])
210         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
211             need_save = True  # We couldn't compare against old stats
212         image_file = open(file_path, 'w+b' if need_save else 'rb')
213     return image_file, need_save
214
215 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
216     link_attrs.update({'link_class': link_class, 'name': link_name})
217     return api_client.links().create(body=link_attrs).execute(
218         num_retries=num_retries)
219
220 def docker_link_sort_key(link):
221     """Build a sort key to find the latest available Docker image.
222
223     To find one source collection for a Docker image referenced by
224     name or image id, the API server looks for a link with the most
225     recent `image_timestamp` property; then the most recent
226     `created_at` timestamp.  This method generates a sort key for
227     Docker metadata links to sort them from least to most preferred.
228     """
229     try:
230         image_timestamp = ciso8601.parse_datetime_unaware(
231             link['properties']['image_timestamp'])
232     except (KeyError, ValueError):
233         image_timestamp = EARLIEST_DATETIME
234     return (image_timestamp,
235             ciso8601.parse_datetime_unaware(link['created_at']))
236
237 def _get_docker_links(api_client, num_retries, **kwargs):
238     links = arvados.util.list_all(api_client.links().list,
239                                   num_retries, **kwargs)
240     for link in links:
241         link['_sort_key'] = docker_link_sort_key(link)
242     links.sort(key=itemgetter('_sort_key'), reverse=True)
243     return links
244
245 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
246     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
247     return {
248         '_sort_key': link['_sort_key'],
249         'timestamp': link['_sort_key'][timestamp_index],
250         'collection': link['head_uuid'],
251         'dockerhash': dockerhash,
252         'repo': repo,
253         'tag': tag,
254         }
255
256 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
257     """List all Docker images known to the api_client with image_name and
258     image_tag.  If no image_name is given, defaults to listing all
259     Docker images.
260
261     Returns a list of tuples representing matching Docker images,
262     sorted in preference order (i.e. the first collection in the list
263     is the one that the API server would use). Each tuple is a
264     (collection_uuid, collection_info) pair, where collection_info is
265     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
266
267     """
268     search_filters = []
269     repo_links = None
270     hash_links = None
271     if image_name:
272         # Find images with the name the user specified.
273         search_links = _get_docker_links(
274             api_client, num_retries,
275             filters=[['link_class', '=', 'docker_image_repo+tag'],
276                      ['name', '=',
277                       '{}:{}'.format(image_name, image_tag or 'latest')]])
278         if search_links:
279             repo_links = search_links
280         else:
281             # Fall back to finding images with the specified image hash.
282             search_links = _get_docker_links(
283                 api_client, num_retries,
284                 filters=[['link_class', '=', 'docker_image_hash'],
285                          ['name', 'ilike', image_name + '%']])
286             hash_links = search_links
287         # Only list information about images that were found in the search.
288         search_filters.append(['head_uuid', 'in',
289                                [link['head_uuid'] for link in search_links]])
290
291     # It should be reasonable to expect that each collection only has one
292     # image hash (though there may be many links specifying this).  Find
293     # the API server's most preferred image hash link for each collection.
294     if hash_links is None:
295         hash_links = _get_docker_links(
296             api_client, num_retries,
297             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
298     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
299
300     # Each collection may have more than one name (though again, one name
301     # may be specified more than once).  Build an image listing from name
302     # tags, sorted by API server preference.
303     if repo_links is None:
304         repo_links = _get_docker_links(
305             api_client, num_retries,
306             filters=search_filters + [['link_class', '=',
307                                        'docker_image_repo+tag']])
308     seen_image_names = collections.defaultdict(set)
309     images = []
310     for link in repo_links:
311         collection_uuid = link['head_uuid']
312         if link['name'] in seen_image_names[collection_uuid]:
313             continue
314         seen_image_names[collection_uuid].add(link['name'])
315         try:
316             dockerhash = hash_link_map[collection_uuid]['name']
317         except KeyError:
318             dockerhash = '<unknown>'
319         name_parts = link['name'].split(':', 1)
320         images.append(_new_image_listing(link, dockerhash, *name_parts))
321
322     # Find any image hash links that did not have a corresponding name link,
323     # and add image listings for them, retaining the API server preference
324     # sorting.
325     images_start_size = len(images)
326     for collection_uuid, link in hash_link_map.items():
327         if not seen_image_names[collection_uuid]:
328             images.append(_new_image_listing(link, link['name']))
329     if len(images) > images_start_size:
330         images.sort(key=itemgetter('_sort_key'), reverse=True)
331
332     # Remove any image listings that refer to unknown collections.
333     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
334             api_client.collections().list, num_retries,
335             filters=[['uuid', 'in', [im['collection'] for im in images]]],
336             select=['uuid'])}
337     return [(image['collection'], image) for image in images
338             if image['collection'] in existing_coll_uuids]
339
340 def items_owned_by(owner_uuid, arv_items):
341     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
342
343 def _uuid2pdh(api, uuid):
344     return api.collections().list(
345         filters=[['uuid', '=', uuid]],
346         select=['portable_data_hash'],
347     ).execute()['items'][0]['portable_data_hash']
348
349 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
350     args = arg_parser.parse_args(arguments)
351     if api is None:
352         api = arvados.api('v1')
353
354     if args.image is None or args.image == 'images':
355         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
356         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
357         try:
358             for i, j in list_images_in_arv(api, args.retries):
359                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
360         except IOError as e:
361             if e.errno == errno.EPIPE:
362                 pass
363             else:
364                 raise
365         sys.exit(0)
366
367     if re.search(r':\w[-.\w]{0,127}$', args.image):
368         # image ends with :valid-tag
369         if args.tag is not None:
370             logger.error(
371                 "image %r already includes a tag, cannot add tag argument %r",
372                 args.image, args.tag)
373             sys.exit(1)
374         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
375         args.image, args.tag = args.image.rsplit(':', 1)
376     elif args.tag is None:
377         args.tag = 'latest'
378
379     # Pull the image if requested, unless the image is specified as a hash
380     # that we already have.
381     if args.pull and not find_image_hashes(args.image):
382         pull_image(args.image, args.tag)
383
384     try:
385         image_hash = find_one_image_hash(args.image, args.tag)
386     except DockerError as error:
387         logger.error(error.message)
388         sys.exit(1)
389
390     if not docker_image_compatible(api, image_hash):
391         if args.force_image_format:
392             logger.warning("forcing incompatible image")
393         else:
394             logger.error("refusing to store " \
395                 "incompatible format (use --force-image-format to override)")
396             sys.exit(1)
397
398     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
399
400     if args.name is None:
401         if image_repo_tag:
402             collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
403         else:
404             collection_name = 'Docker image {}'.format(image_hash[0:12])
405     else:
406         collection_name = args.name
407
408     # Acquire a lock so that only one arv-keepdocker process will
409     # dump/upload a particular docker image at a time.  Do this before
410     # checking if the image already exists in Arvados so that if there
411     # is an upload already underway, when that upload completes and
412     # this process gets a turn, it will discover the Docker image is
413     # already available and exit quickly.
414     outfile_name = '{}.tar'.format(image_hash)
415     lockfile_name = '{}.lock'.format(outfile_name)
416     lockfile = None
417     cache_dir = get_cache_dir()
418     if cache_dir:
419         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
420         fcntl.flock(lockfile, fcntl.LOCK_EX)
421
422     try:
423         if not args.force:
424             # Check if this image is already in Arvados.
425
426             # Project where everything should be owned
427             parent_project_uuid = args.project_uuid or api.users().current().execute(
428                 num_retries=args.retries)['uuid']
429
430             # Find image hash tags
431             existing_links = _get_docker_links(
432                 api, args.retries,
433                 filters=[['link_class', '=', 'docker_image_hash'],
434                          ['name', '=', image_hash]])
435             if existing_links:
436                 # get readable collections
437                 collections = api.collections().list(
438                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
439                     select=["uuid", "owner_uuid", "name", "manifest_text"]
440                     ).execute(num_retries=args.retries)['items']
441
442                 if collections:
443                     # check for repo+tag links on these collections
444                     if image_repo_tag:
445                         existing_repo_tag = _get_docker_links(
446                             api, args.retries,
447                             filters=[['link_class', '=', 'docker_image_repo+tag'],
448                                      ['name', '=', image_repo_tag],
449                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
450                     else:
451                         existing_repo_tag = []
452
453                     try:
454                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
455                     except StopIteration:
456                         # create new collection owned by the project
457                         coll_uuid = api.collections().create(
458                             body={"manifest_text": collections[0]['manifest_text'],
459                                   "name": collection_name,
460                                   "owner_uuid": parent_project_uuid},
461                             ensure_unique_name=True
462                             ).execute(num_retries=args.retries)['uuid']
463
464                     link_base = {'owner_uuid': parent_project_uuid,
465                                  'head_uuid':  coll_uuid,
466                                  'properties': existing_links[0]['properties']}
467
468                     if not any(items_owned_by(parent_project_uuid, existing_links)):
469                         # create image link owned by the project
470                         make_link(api, args.retries,
471                                   'docker_image_hash', image_hash, **link_base)
472
473                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
474                         # create repo+tag link owned by the project
475                         make_link(api, args.retries, 'docker_image_repo+tag',
476                                   image_repo_tag, **link_base)
477
478                     stdout.write(coll_uuid + "\n")
479
480                     sys.exit(0)
481
482         # Open a file for the saved image, and write it if needed.
483         image_file, need_save = prep_image_file(outfile_name)
484         if need_save:
485             save_image(image_hash, image_file)
486
487         # Call arv-put with switches we inherited from it
488         # (a.k.a., switches that aren't our own).
489         put_args = keepdocker_parser.parse_known_args(arguments)[1]
490
491         if args.name is None:
492             put_args += ['--name', collection_name]
493
494         coll_uuid = arv_put.main(
495             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
496             install_sig_handlers=install_sig_handlers).strip()
497
498         # Read the image metadata and make Arvados links from it.
499         image_file.seek(0)
500         image_tar = tarfile.open(fileobj=image_file)
501         image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
502         if image_hash_type:
503             json_filename = raw_image_hash + '.json'
504         else:
505             json_filename = raw_image_hash + '/json'
506         json_file = image_tar.extractfile(image_tar.getmember(json_filename))
507         image_metadata = json.loads(json_file.read().decode())
508         json_file.close()
509         image_tar.close()
510         link_base = {'head_uuid': coll_uuid, 'properties': {}}
511         if 'created' in image_metadata:
512             link_base['properties']['image_timestamp'] = image_metadata['created']
513         if args.project_uuid is not None:
514             link_base['owner_uuid'] = args.project_uuid
515
516         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
517         if image_repo_tag:
518             make_link(api, args.retries,
519                       'docker_image_repo+tag', image_repo_tag, **link_base)
520
521         # Clean up.
522         image_file.close()
523         for filename in [stat_cache_name(image_file), image_file.name]:
524             try:
525                 os.unlink(filename)
526             except OSError as error:
527                 if error.errno != errno.ENOENT:
528                     raise
529     finally:
530         if lockfile is not None:
531             # Closing the lockfile unlocks it.
532             lockfile.close()
533
534 if __name__ == '__main__':
535     main()