354ae50822ccc16791d0ace7ab8be54d6e0dbaf6
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 #!/usr/bin/env python
2
3 from builtins import next
4 import argparse
5 import collections
6 import datetime
7 import errno
8 import json
9 import os
10 import re
11 import subprocess
12 import sys
13 import tarfile
14 import tempfile
15 import shutil
16 import _strptime
17
18 from operator import itemgetter
19 from stat import *
20
21 import arvados
22 import arvados.util
23 import arvados.commands._util as arv_cmd
24 import arvados.commands.put as arv_put
25 from arvados.collection import CollectionReader
26 import ciso8601
27 import logging
28 import arvados.config
29
30 from arvados._version import __version__
31
32 logger = logging.getLogger('arvados.keepdocker')
33 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
34                 else logging.INFO)
35
36 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
37 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
38
39 DockerImage = collections.namedtuple(
40     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
41
42 keepdocker_parser = argparse.ArgumentParser(add_help=False)
43 keepdocker_parser.add_argument(
44     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
45     help='Print version and exit.')
46 keepdocker_parser.add_argument(
47     '-f', '--force', action='store_true', default=False,
48     help="Re-upload the image even if it already exists on the server")
49 keepdocker_parser.add_argument(
50     '--force-image-format', action='store_true', default=False,
51     help="Proceed even if the image format is not supported by the server")
52
53 _group = keepdocker_parser.add_mutually_exclusive_group()
54 _group.add_argument(
55     '--pull', action='store_true', default=False,
56     help="Try to pull the latest image from Docker registry")
57 _group.add_argument(
58     '--no-pull', action='store_false', dest='pull',
59     help="Use locally installed image only, don't pull image from Docker registry (default)")
60
61 keepdocker_parser.add_argument(
62     'image', nargs='?',
63     help="Docker image to upload, as a repository name or hash")
64 keepdocker_parser.add_argument(
65     'tag', nargs='?', default='latest',
66     help="Tag of the Docker image to upload (default 'latest')")
67
68 # Combine keepdocker options listed above with run_opts options of arv-put.
69 # The options inherited from arv-put include --name, --project-uuid,
70 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
71 arg_parser = argparse.ArgumentParser(
72         description="Upload or list Docker images in Arvados",
73         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
74
75 class DockerError(Exception):
76     pass
77
78
79 def popen_docker(cmd, *args, **kwargs):
80     manage_stdin = ('stdin' not in kwargs)
81     kwargs.setdefault('stdin', subprocess.PIPE)
82     kwargs.setdefault('stdout', sys.stderr)
83     try:
84         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
85     except OSError:  # No docker.io in $PATH
86         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
87     if manage_stdin:
88         docker_proc.stdin.close()
89     return docker_proc
90
91 def check_docker(proc, description):
92     proc.wait()
93     if proc.returncode != 0:
94         raise DockerError("docker {} returned status code {}".
95                           format(description, proc.returncode))
96
97 def docker_image_format(image_hash):
98     """Return the registry format ('v1' or 'v2') of the given image."""
99     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
100                         stdout=subprocess.PIPE)
101     try:
102         image_id = next(cmd.stdout).decode().strip()
103         if image_id.startswith('sha256:'):
104             return 'v2'
105         elif ':' not in image_id:
106             return 'v1'
107         else:
108             return 'unknown'
109     finally:
110         check_docker(cmd, "inspect")
111
112 def docker_image_compatible(api, image_hash):
113     supported = api._rootDesc.get('dockerImageFormats', [])
114     if not supported:
115         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
116         return False
117
118     fmt = docker_image_format(image_hash)
119     if fmt in supported:
120         return True
121     else:
122         logger.error("image format is {!r} " \
123             "but server supports only {!r}".format(fmt, supported))
124         return False
125
126 def docker_images():
127     # Yield a DockerImage tuple for each installed image.
128     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
129     list_output = iter(list_proc.stdout)
130     next(list_output)  # Ignore the header line
131     for line in list_output:
132         words = line.split()
133         size_index = len(words) - 2
134         repo, tag, imageid = words[:3]
135         ctime = ' '.join(words[3:size_index])
136         vsize = ' '.join(words[size_index:])
137         yield DockerImage(repo, tag, imageid, ctime, vsize)
138     list_proc.stdout.close()
139     check_docker(list_proc, "images")
140
141 def find_image_hashes(image_search, image_tag=None):
142     # Given one argument, search for Docker images with matching hashes,
143     # and return their full hashes in a set.
144     # Given two arguments, also search for a Docker image with the
145     # same repository and tag.  If one is found, return its hash in a
146     # set; otherwise, fall back to the one-argument hash search.
147     # Returns None if no match is found, or a hash search is ambiguous.
148     hash_search = image_search.lower()
149     hash_matches = set()
150     for image in docker_images():
151         if (image.repo == image_search) and (image.tag == image_tag):
152             return set([image.hash])
153         elif image.hash.startswith(hash_search):
154             hash_matches.add(image.hash)
155     return hash_matches
156
157 def find_one_image_hash(image_search, image_tag=None):
158     hashes = find_image_hashes(image_search, image_tag)
159     hash_count = len(hashes)
160     if hash_count == 1:
161         return hashes.pop()
162     elif hash_count == 0:
163         raise DockerError("no matching image found")
164     else:
165         raise DockerError("{} images match {}".format(hash_count, image_search))
166
167 def stat_cache_name(image_file):
168     return getattr(image_file, 'name', image_file) + '.stat'
169
170 def pull_image(image_name, image_tag):
171     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
172                  "pull")
173
174 def save_image(image_hash, image_file):
175     # Save the specified Docker image to image_file, then try to save its
176     # stats so we can try to resume after interruption.
177     check_docker(popen_docker(['save', image_hash], stdout=image_file),
178                  "save")
179     image_file.flush()
180     try:
181         with open(stat_cache_name(image_file), 'w') as statfile:
182             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
183     except STAT_CACHE_ERRORS:
184         pass  # We won't resume from this cache.  No big deal.
185
186 def prep_image_file(filename):
187     # Return a file object ready to save a Docker image,
188     # and a boolean indicating whether or not we need to actually save the
189     # image (False if a cached save is available).
190     cache_dir = arv_cmd.make_home_conf_dir(
191         os.path.join('.cache', 'arvados', 'docker'), 0o700)
192     if cache_dir is None:
193         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
194         need_save = True
195     else:
196         file_path = os.path.join(cache_dir, filename)
197         try:
198             with open(stat_cache_name(file_path)) as statfile:
199                 prev_stat = json.load(statfile)
200             now_stat = os.stat(file_path)
201             need_save = any(prev_stat[field] != now_stat[field]
202                             for field in [ST_MTIME, ST_SIZE])
203         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
204             need_save = True  # We couldn't compare against old stats
205         image_file = open(file_path, 'w+b' if need_save else 'rb')
206     return image_file, need_save
207
208 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
209     link_attrs.update({'link_class': link_class, 'name': link_name})
210     return api_client.links().create(body=link_attrs).execute(
211         num_retries=num_retries)
212
213 def docker_link_sort_key(link):
214     """Build a sort key to find the latest available Docker image.
215
216     To find one source collection for a Docker image referenced by
217     name or image id, the API server looks for a link with the most
218     recent `image_timestamp` property; then the most recent
219     `created_at` timestamp.  This method generates a sort key for
220     Docker metadata links to sort them from least to most preferred.
221     """
222     try:
223         image_timestamp = ciso8601.parse_datetime_unaware(
224             link['properties']['image_timestamp'])
225     except (KeyError, ValueError):
226         image_timestamp = EARLIEST_DATETIME
227     return (image_timestamp,
228             ciso8601.parse_datetime_unaware(link['created_at']))
229
230 def _get_docker_links(api_client, num_retries, **kwargs):
231     links = arvados.util.list_all(api_client.links().list,
232                                   num_retries, **kwargs)
233     for link in links:
234         link['_sort_key'] = docker_link_sort_key(link)
235     links.sort(key=itemgetter('_sort_key'), reverse=True)
236     return links
237
238 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
239     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
240     return {
241         '_sort_key': link['_sort_key'],
242         'timestamp': link['_sort_key'][timestamp_index],
243         'collection': link['head_uuid'],
244         'dockerhash': dockerhash,
245         'repo': repo,
246         'tag': tag,
247         }
248
249 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
250     """List all Docker images known to the api_client with image_name and
251     image_tag.  If no image_name is given, defaults to listing all
252     Docker images.
253
254     Returns a list of tuples representing matching Docker images,
255     sorted in preference order (i.e. the first collection in the list
256     is the one that the API server would use). Each tuple is a
257     (collection_uuid, collection_info) pair, where collection_info is
258     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
259
260     """
261     search_filters = []
262     repo_links = None
263     hash_links = None
264     if image_name:
265         # Find images with the name the user specified.
266         search_links = _get_docker_links(
267             api_client, num_retries,
268             filters=[['link_class', '=', 'docker_image_repo+tag'],
269                      ['name', '=',
270                       '{}:{}'.format(image_name, image_tag or 'latest')]])
271         if search_links:
272             repo_links = search_links
273         else:
274             # Fall back to finding images with the specified image hash.
275             search_links = _get_docker_links(
276                 api_client, num_retries,
277                 filters=[['link_class', '=', 'docker_image_hash'],
278                          ['name', 'ilike', image_name + '%']])
279             hash_links = search_links
280         # Only list information about images that were found in the search.
281         search_filters.append(['head_uuid', 'in',
282                                [link['head_uuid'] for link in search_links]])
283
284     # It should be reasonable to expect that each collection only has one
285     # image hash (though there may be many links specifying this).  Find
286     # the API server's most preferred image hash link for each collection.
287     if hash_links is None:
288         hash_links = _get_docker_links(
289             api_client, num_retries,
290             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
291     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
292
293     # Each collection may have more than one name (though again, one name
294     # may be specified more than once).  Build an image listing from name
295     # tags, sorted by API server preference.
296     if repo_links is None:
297         repo_links = _get_docker_links(
298             api_client, num_retries,
299             filters=search_filters + [['link_class', '=',
300                                        'docker_image_repo+tag']])
301     seen_image_names = collections.defaultdict(set)
302     images = []
303     for link in repo_links:
304         collection_uuid = link['head_uuid']
305         if link['name'] in seen_image_names[collection_uuid]:
306             continue
307         seen_image_names[collection_uuid].add(link['name'])
308         try:
309             dockerhash = hash_link_map[collection_uuid]['name']
310         except KeyError:
311             dockerhash = '<unknown>'
312         name_parts = link['name'].split(':', 1)
313         images.append(_new_image_listing(link, dockerhash, *name_parts))
314
315     # Find any image hash links that did not have a corresponding name link,
316     # and add image listings for them, retaining the API server preference
317     # sorting.
318     images_start_size = len(images)
319     for collection_uuid, link in hash_link_map.items():
320         if not seen_image_names[collection_uuid]:
321             images.append(_new_image_listing(link, link['name']))
322     if len(images) > images_start_size:
323         images.sort(key=itemgetter('_sort_key'), reverse=True)
324
325     # Remove any image listings that refer to unknown collections.
326     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
327             api_client.collections().list, num_retries,
328             filters=[['uuid', 'in', [im['collection'] for im in images]]],
329             select=['uuid'])}
330     return [(image['collection'], image) for image in images
331             if image['collection'] in existing_coll_uuids]
332
333 def items_owned_by(owner_uuid, arv_items):
334     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
335
336 def _uuid2pdh(api, uuid):
337     return api.collections().list(
338         filters=[['uuid', '=', uuid]],
339         select=['portable_data_hash'],
340     ).execute()['items'][0]['portable_data_hash']
341
342 def main(arguments=None, stdout=sys.stdout):
343     args = arg_parser.parse_args(arguments)
344     api = arvados.api('v1')
345
346     if args.image is None or args.image == 'images':
347         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
348         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
349         try:
350             for i, j in list_images_in_arv(api, args.retries):
351                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
352         except IOError as e:
353             if e.errno == errno.EPIPE:
354                 pass
355             else:
356                 raise
357         sys.exit(0)
358
359     # Pull the image if requested, unless the image is specified as a hash
360     # that we already have.
361     if args.pull and not find_image_hashes(args.image):
362         pull_image(args.image, args.tag)
363
364     try:
365         image_hash = find_one_image_hash(args.image, args.tag)
366     except DockerError as error:
367         logger.error(error.message)
368         sys.exit(1)
369
370     if not docker_image_compatible(api, image_hash):
371         if args.force_image_format:
372             logger.warning("forcing incompatible image")
373         else:
374             logger.error("refusing to store " \
375                 "incompatible format (use --force-image-format to override)")
376             sys.exit(1)
377
378     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
379
380     if args.name is None:
381         if image_repo_tag:
382             collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
383         else:
384             collection_name = 'Docker image {}'.format(image_hash[0:12])
385     else:
386         collection_name = args.name
387
388     if not args.force:
389         # Check if this image is already in Arvados.
390
391         # Project where everything should be owned
392         if args.project_uuid:
393             parent_project_uuid = args.project_uuid
394         else:
395             parent_project_uuid = api.users().current().execute(
396                 num_retries=args.retries)['uuid']
397
398         # Find image hash tags
399         existing_links = _get_docker_links(
400             api, args.retries,
401             filters=[['link_class', '=', 'docker_image_hash'],
402                      ['name', '=', image_hash]])
403         if existing_links:
404             # get readable collections
405             collections = api.collections().list(
406                 filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
407                 select=["uuid", "owner_uuid", "name", "manifest_text"]
408                 ).execute(num_retries=args.retries)['items']
409
410             if collections:
411                 # check for repo+tag links on these collections
412                 if image_repo_tag:
413                     existing_repo_tag = _get_docker_links(
414                         api, args.retries,
415                         filters=[['link_class', '=', 'docker_image_repo+tag'],
416                                  ['name', '=', image_repo_tag],
417                                  ['head_uuid', 'in', [c["uuid"] for c in collections]]])
418                 else:
419                     existing_repo_tag = []
420
421                 try:
422                     coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
423                 except StopIteration:
424                     # create new collection owned by the project
425                     coll_uuid = api.collections().create(
426                         body={"manifest_text": collections[0]['manifest_text'],
427                               "name": collection_name,
428                               "owner_uuid": parent_project_uuid},
429                         ensure_unique_name=True
430                         ).execute(num_retries=args.retries)['uuid']
431
432                 link_base = {'owner_uuid': parent_project_uuid,
433                              'head_uuid':  coll_uuid,
434                              'properties': existing_links[0]['properties']}
435
436                 if not any(items_owned_by(parent_project_uuid, existing_links)):
437                     # create image link owned by the project
438                     make_link(api, args.retries,
439                               'docker_image_hash', image_hash, **link_base)
440
441                 if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
442                     # create repo+tag link owned by the project
443                     make_link(api, args.retries, 'docker_image_repo+tag',
444                               image_repo_tag, **link_base)
445
446                 stdout.write(coll_uuid + "\n")
447
448                 sys.exit(0)
449
450     # Open a file for the saved image, and write it if needed.
451     outfile_name = '{}.tar'.format(image_hash)
452     image_file, need_save = prep_image_file(outfile_name)
453     if need_save:
454         save_image(image_hash, image_file)
455
456     # Call arv-put with switches we inherited from it
457     # (a.k.a., switches that aren't our own).
458     put_args = keepdocker_parser.parse_known_args(arguments)[1]
459
460     if args.name is None:
461         put_args += ['--name', collection_name]
462
463     coll_uuid = arv_put.main(
464         put_args + ['--filename', outfile_name, image_file.name], stdout=stdout).strip()
465
466     # Read the image metadata and make Arvados links from it.
467     image_file.seek(0)
468     image_tar = tarfile.open(fileobj=image_file)
469     image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
470     if image_hash_type:
471         json_filename = raw_image_hash + '.json'
472     else:
473         json_filename = raw_image_hash + '/json'
474     json_file = image_tar.extractfile(image_tar.getmember(json_filename))
475     image_metadata = json.load(json_file)
476     json_file.close()
477     image_tar.close()
478     link_base = {'head_uuid': coll_uuid, 'properties': {}}
479     if 'created' in image_metadata:
480         link_base['properties']['image_timestamp'] = image_metadata['created']
481     if args.project_uuid is not None:
482         link_base['owner_uuid'] = args.project_uuid
483
484     make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
485     if image_repo_tag:
486         make_link(api, args.retries,
487                   'docker_image_repo+tag', image_repo_tag, **link_base)
488
489     # Clean up.
490     image_file.close()
491     for filename in [stat_cache_name(image_file), image_file.name]:
492         try:
493             os.unlink(filename)
494         except OSError as error:
495             if error.errno != errno.ENOENT:
496                 raise
497
498 if __name__ == '__main__':
499     main()