Python 3: only import subprocess32 on py27
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import sys
14 import tarfile
15 import tempfile
16 import shutil
17 import _strptime
18 import fcntl
19 if sys.version_info[0] < 3:
20     import subprocess32 as subprocess
21 else:
22     import subprocess
23
24 from operator import itemgetter
25 from stat import *
26
27 import arvados
28 import arvados.util
29 import arvados.commands._util as arv_cmd
30 import arvados.commands.put as arv_put
31 from arvados.collection import CollectionReader
32 import ciso8601
33 import logging
34 import arvados.config
35
36 from arvados._version import __version__
37
38 logger = logging.getLogger('arvados.keepdocker')
39 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
40                 else logging.INFO)
41
42 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
43 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
44
45 DockerImage = collections.namedtuple(
46     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
47
48 keepdocker_parser = argparse.ArgumentParser(add_help=False)
49 keepdocker_parser.add_argument(
50     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
51     help='Print version and exit.')
52 keepdocker_parser.add_argument(
53     '-f', '--force', action='store_true', default=False,
54     help="Re-upload the image even if it already exists on the server")
55 keepdocker_parser.add_argument(
56     '--force-image-format', action='store_true', default=False,
57     help="Proceed even if the image format is not supported by the server")
58
59 _group = keepdocker_parser.add_mutually_exclusive_group()
60 _group.add_argument(
61     '--pull', action='store_true', default=False,
62     help="Try to pull the latest image from Docker registry")
63 _group.add_argument(
64     '--no-pull', action='store_false', dest='pull',
65     help="Use locally installed image only, don't pull image from Docker registry (default)")
66
67 keepdocker_parser.add_argument(
68     'image', nargs='?',
69     help="Docker image to upload: repo, repo:tag, or hash")
70 keepdocker_parser.add_argument(
71     'tag', nargs='?',
72     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
73
74 # Combine keepdocker options listed above with run_opts options of arv-put.
75 # The options inherited from arv-put include --name, --project-uuid,
76 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
77 arg_parser = argparse.ArgumentParser(
78         description="Upload or list Docker images in Arvados",
79         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
80
81 class DockerError(Exception):
82     pass
83
84
85 def popen_docker(cmd, *args, **kwargs):
86     manage_stdin = ('stdin' not in kwargs)
87     kwargs.setdefault('stdin', subprocess.PIPE)
88     kwargs.setdefault('stdout', sys.stderr)
89     try:
90         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
91     except OSError:  # No docker.io in $PATH
92         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
93     if manage_stdin:
94         docker_proc.stdin.close()
95     return docker_proc
96
97 def check_docker(proc, description):
98     proc.wait()
99     if proc.returncode != 0:
100         raise DockerError("docker {} returned status code {}".
101                           format(description, proc.returncode))
102
103 def docker_image_format(image_hash):
104     """Return the registry format ('v1' or 'v2') of the given image."""
105     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
106                         stdout=subprocess.PIPE)
107     try:
108         image_id = next(cmd.stdout).decode().strip()
109         if image_id.startswith('sha256:'):
110             return 'v2'
111         elif ':' not in image_id:
112             return 'v1'
113         else:
114             return 'unknown'
115     finally:
116         check_docker(cmd, "inspect")
117
118 def docker_image_compatible(api, image_hash):
119     supported = api._rootDesc.get('dockerImageFormats', [])
120     if not supported:
121         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
122         return False
123
124     fmt = docker_image_format(image_hash)
125     if fmt in supported:
126         return True
127     else:
128         logger.error("image format is {!r} " \
129             "but server supports only {!r}".format(fmt, supported))
130         return False
131
132 def docker_images():
133     # Yield a DockerImage tuple for each installed image.
134     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
135     list_output = iter(list_proc.stdout)
136     next(list_output)  # Ignore the header line
137     for line in list_output:
138         words = line.split()
139         words = [word.decode() for word in words]
140         size_index = len(words) - 2
141         repo, tag, imageid = words[:3]
142         ctime = ' '.join(words[3:size_index])
143         vsize = ' '.join(words[size_index:])
144         yield DockerImage(repo, tag, imageid, ctime, vsize)
145     list_proc.stdout.close()
146     check_docker(list_proc, "images")
147
148 def find_image_hashes(image_search, image_tag=None):
149     # Given one argument, search for Docker images with matching hashes,
150     # and return their full hashes in a set.
151     # Given two arguments, also search for a Docker image with the
152     # same repository and tag.  If one is found, return its hash in a
153     # set; otherwise, fall back to the one-argument hash search.
154     # Returns None if no match is found, or a hash search is ambiguous.
155     hash_search = image_search.lower()
156     hash_matches = set()
157     for image in docker_images():
158         if (image.repo == image_search) and (image.tag == image_tag):
159             return set([image.hash])
160         elif image.hash.startswith(hash_search):
161             hash_matches.add(image.hash)
162     return hash_matches
163
164 def find_one_image_hash(image_search, image_tag=None):
165     hashes = find_image_hashes(image_search, image_tag)
166     hash_count = len(hashes)
167     if hash_count == 1:
168         return hashes.pop()
169     elif hash_count == 0:
170         raise DockerError("no matching image found")
171     else:
172         raise DockerError("{} images match {}".format(hash_count, image_search))
173
174 def stat_cache_name(image_file):
175     return getattr(image_file, 'name', image_file) + '.stat'
176
177 def pull_image(image_name, image_tag):
178     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
179                  "pull")
180
181 def save_image(image_hash, image_file):
182     # Save the specified Docker image to image_file, then try to save its
183     # stats so we can try to resume after interruption.
184     check_docker(popen_docker(['save', image_hash], stdout=image_file),
185                  "save")
186     image_file.flush()
187     try:
188         with open(stat_cache_name(image_file), 'w') as statfile:
189             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
190     except STAT_CACHE_ERRORS:
191         pass  # We won't resume from this cache.  No big deal.
192
193 def get_cache_dir():
194     return arv_cmd.make_home_conf_dir(
195         os.path.join('.cache', 'arvados', 'docker'), 0o700)
196
197 def prep_image_file(filename):
198     # Return a file object ready to save a Docker image,
199     # and a boolean indicating whether or not we need to actually save the
200     # image (False if a cached save is available).
201     cache_dir = get_cache_dir()
202     if cache_dir is None:
203         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
204         need_save = True
205     else:
206         file_path = os.path.join(cache_dir, filename)
207         try:
208             with open(stat_cache_name(file_path)) as statfile:
209                 prev_stat = json.load(statfile)
210             now_stat = os.stat(file_path)
211             need_save = any(prev_stat[field] != now_stat[field]
212                             for field in [ST_MTIME, ST_SIZE])
213         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
214             need_save = True  # We couldn't compare against old stats
215         image_file = open(file_path, 'w+b' if need_save else 'rb')
216     return image_file, need_save
217
218 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
219     link_attrs.update({'link_class': link_class, 'name': link_name})
220     return api_client.links().create(body=link_attrs).execute(
221         num_retries=num_retries)
222
223 def docker_link_sort_key(link):
224     """Build a sort key to find the latest available Docker image.
225
226     To find one source collection for a Docker image referenced by
227     name or image id, the API server looks for a link with the most
228     recent `image_timestamp` property; then the most recent
229     `created_at` timestamp.  This method generates a sort key for
230     Docker metadata links to sort them from least to most preferred.
231     """
232     try:
233         image_timestamp = ciso8601.parse_datetime_unaware(
234             link['properties']['image_timestamp'])
235     except (KeyError, ValueError):
236         image_timestamp = EARLIEST_DATETIME
237     return (image_timestamp,
238             ciso8601.parse_datetime_unaware(link['created_at']))
239
240 def _get_docker_links(api_client, num_retries, **kwargs):
241     links = arvados.util.list_all(api_client.links().list,
242                                   num_retries, **kwargs)
243     for link in links:
244         link['_sort_key'] = docker_link_sort_key(link)
245     links.sort(key=itemgetter('_sort_key'), reverse=True)
246     return links
247
248 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
249     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
250     return {
251         '_sort_key': link['_sort_key'],
252         'timestamp': link['_sort_key'][timestamp_index],
253         'collection': link['head_uuid'],
254         'dockerhash': dockerhash,
255         'repo': repo,
256         'tag': tag,
257         }
258
259 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
260     """List all Docker images known to the api_client with image_name and
261     image_tag.  If no image_name is given, defaults to listing all
262     Docker images.
263
264     Returns a list of tuples representing matching Docker images,
265     sorted in preference order (i.e. the first collection in the list
266     is the one that the API server would use). Each tuple is a
267     (collection_uuid, collection_info) pair, where collection_info is
268     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
269
270     """
271     search_filters = []
272     repo_links = None
273     hash_links = None
274     if image_name:
275         # Find images with the name the user specified.
276         search_links = _get_docker_links(
277             api_client, num_retries,
278             filters=[['link_class', '=', 'docker_image_repo+tag'],
279                      ['name', '=',
280                       '{}:{}'.format(image_name, image_tag or 'latest')]])
281         if search_links:
282             repo_links = search_links
283         else:
284             # Fall back to finding images with the specified image hash.
285             search_links = _get_docker_links(
286                 api_client, num_retries,
287                 filters=[['link_class', '=', 'docker_image_hash'],
288                          ['name', 'ilike', image_name + '%']])
289             hash_links = search_links
290         # Only list information about images that were found in the search.
291         search_filters.append(['head_uuid', 'in',
292                                [link['head_uuid'] for link in search_links]])
293
294     # It should be reasonable to expect that each collection only has one
295     # image hash (though there may be many links specifying this).  Find
296     # the API server's most preferred image hash link for each collection.
297     if hash_links is None:
298         hash_links = _get_docker_links(
299             api_client, num_retries,
300             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
301     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
302
303     # Each collection may have more than one name (though again, one name
304     # may be specified more than once).  Build an image listing from name
305     # tags, sorted by API server preference.
306     if repo_links is None:
307         repo_links = _get_docker_links(
308             api_client, num_retries,
309             filters=search_filters + [['link_class', '=',
310                                        'docker_image_repo+tag']])
311     seen_image_names = collections.defaultdict(set)
312     images = []
313     for link in repo_links:
314         collection_uuid = link['head_uuid']
315         if link['name'] in seen_image_names[collection_uuid]:
316             continue
317         seen_image_names[collection_uuid].add(link['name'])
318         try:
319             dockerhash = hash_link_map[collection_uuid]['name']
320         except KeyError:
321             dockerhash = '<unknown>'
322         name_parts = link['name'].split(':', 1)
323         images.append(_new_image_listing(link, dockerhash, *name_parts))
324
325     # Find any image hash links that did not have a corresponding name link,
326     # and add image listings for them, retaining the API server preference
327     # sorting.
328     images_start_size = len(images)
329     for collection_uuid, link in hash_link_map.items():
330         if not seen_image_names[collection_uuid]:
331             images.append(_new_image_listing(link, link['name']))
332     if len(images) > images_start_size:
333         images.sort(key=itemgetter('_sort_key'), reverse=True)
334
335     # Remove any image listings that refer to unknown collections.
336     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
337             api_client.collections().list, num_retries,
338             filters=[['uuid', 'in', [im['collection'] for im in images]]],
339             select=['uuid'])}
340     return [(image['collection'], image) for image in images
341             if image['collection'] in existing_coll_uuids]
342
343 def items_owned_by(owner_uuid, arv_items):
344     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
345
346 def _uuid2pdh(api, uuid):
347     return api.collections().list(
348         filters=[['uuid', '=', uuid]],
349         select=['portable_data_hash'],
350     ).execute()['items'][0]['portable_data_hash']
351
352 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
353     args = arg_parser.parse_args(arguments)
354     if api is None:
355         api = arvados.api('v1')
356
357     if args.image is None or args.image == 'images':
358         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
359         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
360         try:
361             for i, j in list_images_in_arv(api, args.retries):
362                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
363         except IOError as e:
364             if e.errno == errno.EPIPE:
365                 pass
366             else:
367                 raise
368         sys.exit(0)
369
370     if re.search(r':\w[-.\w]{0,127}$', args.image):
371         # image ends with :valid-tag
372         if args.tag is not None:
373             logger.error(
374                 "image %r already includes a tag, cannot add tag argument %r",
375                 args.image, args.tag)
376             sys.exit(1)
377         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
378         args.image, args.tag = args.image.rsplit(':', 1)
379     elif args.tag is None:
380         args.tag = 'latest'
381
382     # Pull the image if requested, unless the image is specified as a hash
383     # that we already have.
384     if args.pull and not find_image_hashes(args.image):
385         pull_image(args.image, args.tag)
386
387     try:
388         image_hash = find_one_image_hash(args.image, args.tag)
389     except DockerError as error:
390         logger.error(error.message)
391         sys.exit(1)
392
393     if not docker_image_compatible(api, image_hash):
394         if args.force_image_format:
395             logger.warning("forcing incompatible image")
396         else:
397             logger.error("refusing to store " \
398                 "incompatible format (use --force-image-format to override)")
399             sys.exit(1)
400
401     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
402
403     if args.name is None:
404         if image_repo_tag:
405             collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
406         else:
407             collection_name = 'Docker image {}'.format(image_hash[0:12])
408     else:
409         collection_name = args.name
410
411     # Acquire a lock so that only one arv-keepdocker process will
412     # dump/upload a particular docker image at a time.  Do this before
413     # checking if the image already exists in Arvados so that if there
414     # is an upload already underway, when that upload completes and
415     # this process gets a turn, it will discover the Docker image is
416     # already available and exit quickly.
417     outfile_name = '{}.tar'.format(image_hash)
418     lockfile_name = '{}.lock'.format(outfile_name)
419     lockfile = None
420     cache_dir = get_cache_dir()
421     if cache_dir:
422         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
423         fcntl.flock(lockfile, fcntl.LOCK_EX)
424
425     try:
426         if not args.force:
427             # Check if this image is already in Arvados.
428
429             # Project where everything should be owned
430             parent_project_uuid = args.project_uuid or api.users().current().execute(
431                 num_retries=args.retries)['uuid']
432
433             # Find image hash tags
434             existing_links = _get_docker_links(
435                 api, args.retries,
436                 filters=[['link_class', '=', 'docker_image_hash'],
437                          ['name', '=', image_hash]])
438             if existing_links:
439                 # get readable collections
440                 collections = api.collections().list(
441                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
442                     select=["uuid", "owner_uuid", "name", "manifest_text"]
443                     ).execute(num_retries=args.retries)['items']
444
445                 if collections:
446                     # check for repo+tag links on these collections
447                     if image_repo_tag:
448                         existing_repo_tag = _get_docker_links(
449                             api, args.retries,
450                             filters=[['link_class', '=', 'docker_image_repo+tag'],
451                                      ['name', '=', image_repo_tag],
452                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
453                     else:
454                         existing_repo_tag = []
455
456                     try:
457                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
458                     except StopIteration:
459                         # create new collection owned by the project
460                         coll_uuid = api.collections().create(
461                             body={"manifest_text": collections[0]['manifest_text'],
462                                   "name": collection_name,
463                                   "owner_uuid": parent_project_uuid},
464                             ensure_unique_name=True
465                             ).execute(num_retries=args.retries)['uuid']
466
467                     link_base = {'owner_uuid': parent_project_uuid,
468                                  'head_uuid':  coll_uuid,
469                                  'properties': existing_links[0]['properties']}
470
471                     if not any(items_owned_by(parent_project_uuid, existing_links)):
472                         # create image link owned by the project
473                         make_link(api, args.retries,
474                                   'docker_image_hash', image_hash, **link_base)
475
476                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
477                         # create repo+tag link owned by the project
478                         make_link(api, args.retries, 'docker_image_repo+tag',
479                                   image_repo_tag, **link_base)
480
481                     stdout.write(coll_uuid + "\n")
482
483                     sys.exit(0)
484
485         # Open a file for the saved image, and write it if needed.
486         image_file, need_save = prep_image_file(outfile_name)
487         if need_save:
488             save_image(image_hash, image_file)
489
490         # Call arv-put with switches we inherited from it
491         # (a.k.a., switches that aren't our own).
492         put_args = keepdocker_parser.parse_known_args(arguments)[1]
493
494         if args.name is None:
495             put_args += ['--name', collection_name]
496
497         coll_uuid = arv_put.main(
498             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
499             install_sig_handlers=install_sig_handlers).strip()
500
501         # Read the image metadata and make Arvados links from it.
502         image_file.seek(0)
503         image_tar = tarfile.open(fileobj=image_file)
504         image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
505         if image_hash_type:
506             json_filename = raw_image_hash + '.json'
507         else:
508             json_filename = raw_image_hash + '/json'
509         json_file = image_tar.extractfile(image_tar.getmember(json_filename))
510         image_metadata = json.loads(json_file.read().decode())
511         json_file.close()
512         image_tar.close()
513         link_base = {'head_uuid': coll_uuid, 'properties': {}}
514         if 'created' in image_metadata:
515             link_base['properties']['image_timestamp'] = image_metadata['created']
516         if args.project_uuid is not None:
517             link_base['owner_uuid'] = args.project_uuid
518
519         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
520         if image_repo_tag:
521             make_link(api, args.retries,
522                       'docker_image_repo+tag', image_repo_tag, **link_base)
523
524         # Clean up.
525         image_file.close()
526         for filename in [stat_cache_name(image_file), image_file.name]:
527             try:
528                 os.unlink(filename)
529             except OSError as error:
530                 if error.errno != errno.ENOENT:
531                     raise
532     finally:
533         if lockfile is not None:
534             # Closing the lockfile unlocks it.
535             lockfile.close()
536
537 if __name__ == '__main__':
538     main()