Merge branch 'master' into 13804-no-shutdown-wanted-nodes
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import subprocess32 as subprocess
14 import sys
15 import tarfile
16 import tempfile
17 import shutil
18 import _strptime
19 import fcntl
20
21 from operator import itemgetter
22 from stat import *
23
24 import arvados
25 import arvados.util
26 import arvados.commands._util as arv_cmd
27 import arvados.commands.put as arv_put
28 from arvados.collection import CollectionReader
29 import ciso8601
30 import logging
31 import arvados.config
32
33 from arvados._version import __version__
34
35 logger = logging.getLogger('arvados.keepdocker')
36 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
37                 else logging.INFO)
38
39 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
40 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
41
42 DockerImage = collections.namedtuple(
43     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
44
45 keepdocker_parser = argparse.ArgumentParser(add_help=False)
46 keepdocker_parser.add_argument(
47     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
48     help='Print version and exit.')
49 keepdocker_parser.add_argument(
50     '-f', '--force', action='store_true', default=False,
51     help="Re-upload the image even if it already exists on the server")
52 keepdocker_parser.add_argument(
53     '--force-image-format', action='store_true', default=False,
54     help="Proceed even if the image format is not supported by the server")
55
56 _group = keepdocker_parser.add_mutually_exclusive_group()
57 _group.add_argument(
58     '--pull', action='store_true', default=False,
59     help="Try to pull the latest image from Docker registry")
60 _group.add_argument(
61     '--no-pull', action='store_false', dest='pull',
62     help="Use locally installed image only, don't pull image from Docker registry (default)")
63
64 keepdocker_parser.add_argument(
65     'image', nargs='?',
66     help="Docker image to upload: repo, repo:tag, or hash")
67 keepdocker_parser.add_argument(
68     'tag', nargs='?',
69     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
70
71 # Combine keepdocker options listed above with run_opts options of arv-put.
72 # The options inherited from arv-put include --name, --project-uuid,
73 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
74 arg_parser = argparse.ArgumentParser(
75         description="Upload or list Docker images in Arvados",
76         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
77
78 class DockerError(Exception):
79     pass
80
81
82 def popen_docker(cmd, *args, **kwargs):
83     manage_stdin = ('stdin' not in kwargs)
84     kwargs.setdefault('stdin', subprocess.PIPE)
85     kwargs.setdefault('stdout', sys.stderr)
86     try:
87         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
88     except OSError:  # No docker.io in $PATH
89         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
90     if manage_stdin:
91         docker_proc.stdin.close()
92     return docker_proc
93
94 def check_docker(proc, description):
95     proc.wait()
96     if proc.returncode != 0:
97         raise DockerError("docker {} returned status code {}".
98                           format(description, proc.returncode))
99
100 def docker_image_format(image_hash):
101     """Return the registry format ('v1' or 'v2') of the given image."""
102     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
103                         stdout=subprocess.PIPE)
104     try:
105         image_id = next(cmd.stdout).decode().strip()
106         if image_id.startswith('sha256:'):
107             return 'v2'
108         elif ':' not in image_id:
109             return 'v1'
110         else:
111             return 'unknown'
112     finally:
113         check_docker(cmd, "inspect")
114
115 def docker_image_compatible(api, image_hash):
116     supported = api._rootDesc.get('dockerImageFormats', [])
117     if not supported:
118         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
119         return False
120
121     fmt = docker_image_format(image_hash)
122     if fmt in supported:
123         return True
124     else:
125         logger.error("image format is {!r} " \
126             "but server supports only {!r}".format(fmt, supported))
127         return False
128
129 def docker_images():
130     # Yield a DockerImage tuple for each installed image.
131     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
132     list_output = iter(list_proc.stdout)
133     next(list_output)  # Ignore the header line
134     for line in list_output:
135         words = line.split()
136         size_index = len(words) - 2
137         repo, tag, imageid = words[:3]
138         ctime = ' '.join(words[3:size_index])
139         vsize = ' '.join(words[size_index:])
140         yield DockerImage(repo, tag, imageid, ctime, vsize)
141     list_proc.stdout.close()
142     check_docker(list_proc, "images")
143
144 def find_image_hashes(image_search, image_tag=None):
145     # Given one argument, search for Docker images with matching hashes,
146     # and return their full hashes in a set.
147     # Given two arguments, also search for a Docker image with the
148     # same repository and tag.  If one is found, return its hash in a
149     # set; otherwise, fall back to the one-argument hash search.
150     # Returns None if no match is found, or a hash search is ambiguous.
151     hash_search = image_search.lower()
152     hash_matches = set()
153     for image in docker_images():
154         if (image.repo == image_search) and (image.tag == image_tag):
155             return set([image.hash])
156         elif image.hash.startswith(hash_search):
157             hash_matches.add(image.hash)
158     return hash_matches
159
160 def find_one_image_hash(image_search, image_tag=None):
161     hashes = find_image_hashes(image_search, image_tag)
162     hash_count = len(hashes)
163     if hash_count == 1:
164         return hashes.pop()
165     elif hash_count == 0:
166         raise DockerError("no matching image found")
167     else:
168         raise DockerError("{} images match {}".format(hash_count, image_search))
169
170 def stat_cache_name(image_file):
171     return getattr(image_file, 'name', image_file) + '.stat'
172
173 def pull_image(image_name, image_tag):
174     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
175                  "pull")
176
177 def save_image(image_hash, image_file):
178     # Save the specified Docker image to image_file, then try to save its
179     # stats so we can try to resume after interruption.
180     check_docker(popen_docker(['save', image_hash], stdout=image_file),
181                  "save")
182     image_file.flush()
183     try:
184         with open(stat_cache_name(image_file), 'w') as statfile:
185             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
186     except STAT_CACHE_ERRORS:
187         pass  # We won't resume from this cache.  No big deal.
188
189 def get_cache_dir():
190     return arv_cmd.make_home_conf_dir(
191         os.path.join('.cache', 'arvados', 'docker'), 0o700)
192
193 def prep_image_file(filename):
194     # Return a file object ready to save a Docker image,
195     # and a boolean indicating whether or not we need to actually save the
196     # image (False if a cached save is available).
197     cache_dir = get_cache_dir()
198     if cache_dir is None:
199         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
200         need_save = True
201     else:
202         file_path = os.path.join(cache_dir, filename)
203         try:
204             with open(stat_cache_name(file_path)) as statfile:
205                 prev_stat = json.load(statfile)
206             now_stat = os.stat(file_path)
207             need_save = any(prev_stat[field] != now_stat[field]
208                             for field in [ST_MTIME, ST_SIZE])
209         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
210             need_save = True  # We couldn't compare against old stats
211         image_file = open(file_path, 'w+b' if need_save else 'rb')
212     return image_file, need_save
213
214 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
215     link_attrs.update({'link_class': link_class, 'name': link_name})
216     return api_client.links().create(body=link_attrs).execute(
217         num_retries=num_retries)
218
219 def docker_link_sort_key(link):
220     """Build a sort key to find the latest available Docker image.
221
222     To find one source collection for a Docker image referenced by
223     name or image id, the API server looks for a link with the most
224     recent `image_timestamp` property; then the most recent
225     `created_at` timestamp.  This method generates a sort key for
226     Docker metadata links to sort them from least to most preferred.
227     """
228     try:
229         image_timestamp = ciso8601.parse_datetime_unaware(
230             link['properties']['image_timestamp'])
231     except (KeyError, ValueError):
232         image_timestamp = EARLIEST_DATETIME
233     return (image_timestamp,
234             ciso8601.parse_datetime_unaware(link['created_at']))
235
236 def _get_docker_links(api_client, num_retries, **kwargs):
237     links = arvados.util.list_all(api_client.links().list,
238                                   num_retries, **kwargs)
239     for link in links:
240         link['_sort_key'] = docker_link_sort_key(link)
241     links.sort(key=itemgetter('_sort_key'), reverse=True)
242     return links
243
244 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
245     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
246     return {
247         '_sort_key': link['_sort_key'],
248         'timestamp': link['_sort_key'][timestamp_index],
249         'collection': link['head_uuid'],
250         'dockerhash': dockerhash,
251         'repo': repo,
252         'tag': tag,
253         }
254
255 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
256     """List all Docker images known to the api_client with image_name and
257     image_tag.  If no image_name is given, defaults to listing all
258     Docker images.
259
260     Returns a list of tuples representing matching Docker images,
261     sorted in preference order (i.e. the first collection in the list
262     is the one that the API server would use). Each tuple is a
263     (collection_uuid, collection_info) pair, where collection_info is
264     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
265
266     """
267     search_filters = []
268     repo_links = None
269     hash_links = None
270     if image_name:
271         # Find images with the name the user specified.
272         search_links = _get_docker_links(
273             api_client, num_retries,
274             filters=[['link_class', '=', 'docker_image_repo+tag'],
275                      ['name', '=',
276                       '{}:{}'.format(image_name, image_tag or 'latest')]])
277         if search_links:
278             repo_links = search_links
279         else:
280             # Fall back to finding images with the specified image hash.
281             search_links = _get_docker_links(
282                 api_client, num_retries,
283                 filters=[['link_class', '=', 'docker_image_hash'],
284                          ['name', 'ilike', image_name + '%']])
285             hash_links = search_links
286         # Only list information about images that were found in the search.
287         search_filters.append(['head_uuid', 'in',
288                                [link['head_uuid'] for link in search_links]])
289
290     # It should be reasonable to expect that each collection only has one
291     # image hash (though there may be many links specifying this).  Find
292     # the API server's most preferred image hash link for each collection.
293     if hash_links is None:
294         hash_links = _get_docker_links(
295             api_client, num_retries,
296             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
297     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
298
299     # Each collection may have more than one name (though again, one name
300     # may be specified more than once).  Build an image listing from name
301     # tags, sorted by API server preference.
302     if repo_links is None:
303         repo_links = _get_docker_links(
304             api_client, num_retries,
305             filters=search_filters + [['link_class', '=',
306                                        'docker_image_repo+tag']])
307     seen_image_names = collections.defaultdict(set)
308     images = []
309     for link in repo_links:
310         collection_uuid = link['head_uuid']
311         if link['name'] in seen_image_names[collection_uuid]:
312             continue
313         seen_image_names[collection_uuid].add(link['name'])
314         try:
315             dockerhash = hash_link_map[collection_uuid]['name']
316         except KeyError:
317             dockerhash = '<unknown>'
318         name_parts = link['name'].split(':', 1)
319         images.append(_new_image_listing(link, dockerhash, *name_parts))
320
321     # Find any image hash links that did not have a corresponding name link,
322     # and add image listings for them, retaining the API server preference
323     # sorting.
324     images_start_size = len(images)
325     for collection_uuid, link in hash_link_map.items():
326         if not seen_image_names[collection_uuid]:
327             images.append(_new_image_listing(link, link['name']))
328     if len(images) > images_start_size:
329         images.sort(key=itemgetter('_sort_key'), reverse=True)
330
331     # Remove any image listings that refer to unknown collections.
332     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
333             api_client.collections().list, num_retries,
334             filters=[['uuid', 'in', [im['collection'] for im in images]]],
335             select=['uuid'])}
336     return [(image['collection'], image) for image in images
337             if image['collection'] in existing_coll_uuids]
338
339 def items_owned_by(owner_uuid, arv_items):
340     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
341
342 def _uuid2pdh(api, uuid):
343     return api.collections().list(
344         filters=[['uuid', '=', uuid]],
345         select=['portable_data_hash'],
346     ).execute()['items'][0]['portable_data_hash']
347
348 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
349     args = arg_parser.parse_args(arguments)
350     if api is None:
351         api = arvados.api('v1')
352
353     if args.image is None or args.image == 'images':
354         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
355         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
356         try:
357             for i, j in list_images_in_arv(api, args.retries):
358                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
359         except IOError as e:
360             if e.errno == errno.EPIPE:
361                 pass
362             else:
363                 raise
364         sys.exit(0)
365
366     if re.search(r':\w[-.\w]{0,127}$', args.image):
367         # image ends with :valid-tag
368         if args.tag is not None:
369             logger.error(
370                 "image %r already includes a tag, cannot add tag argument %r",
371                 args.image, args.tag)
372             sys.exit(1)
373         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
374         args.image, args.tag = args.image.rsplit(':', 1)
375     elif args.tag is None:
376         args.tag = 'latest'
377
378     # Pull the image if requested, unless the image is specified as a hash
379     # that we already have.
380     if args.pull and not find_image_hashes(args.image):
381         pull_image(args.image, args.tag)
382
383     try:
384         image_hash = find_one_image_hash(args.image, args.tag)
385     except DockerError as error:
386         logger.error(error.message)
387         sys.exit(1)
388
389     if not docker_image_compatible(api, image_hash):
390         if args.force_image_format:
391             logger.warning("forcing incompatible image")
392         else:
393             logger.error("refusing to store " \
394                 "incompatible format (use --force-image-format to override)")
395             sys.exit(1)
396
397     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
398
399     if args.name is None:
400         if image_repo_tag:
401             collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
402         else:
403             collection_name = 'Docker image {}'.format(image_hash[0:12])
404     else:
405         collection_name = args.name
406
407     # Acquire a lock so that only one arv-keepdocker process will
408     # dump/upload a particular docker image at a time.  Do this before
409     # checking if the image already exists in Arvados so that if there
410     # is an upload already underway, when that upload completes and
411     # this process gets a turn, it will discover the Docker image is
412     # already available and exit quickly.
413     outfile_name = '{}.tar'.format(image_hash)
414     lockfile_name = '{}.lock'.format(outfile_name)
415     lockfile = None
416     cache_dir = get_cache_dir()
417     if cache_dir:
418         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
419         fcntl.flock(lockfile, fcntl.LOCK_EX)
420
421     try:
422         if not args.force:
423             # Check if this image is already in Arvados.
424
425             # Project where everything should be owned
426             parent_project_uuid = args.project_uuid or api.users().current().execute(
427                 num_retries=args.retries)['uuid']
428
429             # Find image hash tags
430             existing_links = _get_docker_links(
431                 api, args.retries,
432                 filters=[['link_class', '=', 'docker_image_hash'],
433                          ['name', '=', image_hash]])
434             if existing_links:
435                 # get readable collections
436                 collections = api.collections().list(
437                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
438                     select=["uuid", "owner_uuid", "name", "manifest_text"]
439                     ).execute(num_retries=args.retries)['items']
440
441                 if collections:
442                     # check for repo+tag links on these collections
443                     if image_repo_tag:
444                         existing_repo_tag = _get_docker_links(
445                             api, args.retries,
446                             filters=[['link_class', '=', 'docker_image_repo+tag'],
447                                      ['name', '=', image_repo_tag],
448                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
449                     else:
450                         existing_repo_tag = []
451
452                     try:
453                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
454                     except StopIteration:
455                         # create new collection owned by the project
456                         coll_uuid = api.collections().create(
457                             body={"manifest_text": collections[0]['manifest_text'],
458                                   "name": collection_name,
459                                   "owner_uuid": parent_project_uuid},
460                             ensure_unique_name=True
461                             ).execute(num_retries=args.retries)['uuid']
462
463                     link_base = {'owner_uuid': parent_project_uuid,
464                                  'head_uuid':  coll_uuid,
465                                  'properties': existing_links[0]['properties']}
466
467                     if not any(items_owned_by(parent_project_uuid, existing_links)):
468                         # create image link owned by the project
469                         make_link(api, args.retries,
470                                   'docker_image_hash', image_hash, **link_base)
471
472                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
473                         # create repo+tag link owned by the project
474                         make_link(api, args.retries, 'docker_image_repo+tag',
475                                   image_repo_tag, **link_base)
476
477                     stdout.write(coll_uuid + "\n")
478
479                     sys.exit(0)
480
481         # Open a file for the saved image, and write it if needed.
482         image_file, need_save = prep_image_file(outfile_name)
483         if need_save:
484             save_image(image_hash, image_file)
485
486         # Call arv-put with switches we inherited from it
487         # (a.k.a., switches that aren't our own).
488         put_args = keepdocker_parser.parse_known_args(arguments)[1]
489
490         if args.name is None:
491             put_args += ['--name', collection_name]
492
493         coll_uuid = arv_put.main(
494             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
495             install_sig_handlers=install_sig_handlers).strip()
496
497         # Read the image metadata and make Arvados links from it.
498         image_file.seek(0)
499         image_tar = tarfile.open(fileobj=image_file)
500         image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
501         if image_hash_type:
502             json_filename = raw_image_hash + '.json'
503         else:
504             json_filename = raw_image_hash + '/json'
505         json_file = image_tar.extractfile(image_tar.getmember(json_filename))
506         image_metadata = json.load(json_file)
507         json_file.close()
508         image_tar.close()
509         link_base = {'head_uuid': coll_uuid, 'properties': {}}
510         if 'created' in image_metadata:
511             link_base['properties']['image_timestamp'] = image_metadata['created']
512         if args.project_uuid is not None:
513             link_base['owner_uuid'] = args.project_uuid
514
515         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
516         if image_repo_tag:
517             make_link(api, args.retries,
518                       'docker_image_repo+tag', image_repo_tag, **link_base)
519
520         # Clean up.
521         image_file.close()
522         for filename in [stat_cache_name(image_file), image_file.name]:
523             try:
524                 os.unlink(filename)
525             except OSError as error:
526                 if error.errno != errno.ENOENT:
527                     raise
528     finally:
529         if lockfile is not None:
530             # Closing the lockfile unlocks it.
531             lockfile.close()
532
533 if __name__ == '__main__':
534     main()