15148: Include list of affected PDHs in LostBlocksFile.
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import sys
14 import tarfile
15 import tempfile
16 import shutil
17 import _strptime
18 import fcntl
19 from operator import itemgetter
20 from stat import *
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 import arvados
28 import arvados.util
29 import arvados.commands._util as arv_cmd
30 import arvados.commands.put as arv_put
31 from arvados.collection import CollectionReader
32 import ciso8601
33 import logging
34 import arvados.config
35
36 from arvados._version import __version__
37
38 logger = logging.getLogger('arvados.keepdocker')
39 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
40                 else logging.INFO)
41
42 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
43 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
44
45 DockerImage = collections.namedtuple(
46     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
47
48 keepdocker_parser = argparse.ArgumentParser(add_help=False)
49 keepdocker_parser.add_argument(
50     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
51     help='Print version and exit.')
52 keepdocker_parser.add_argument(
53     '-f', '--force', action='store_true', default=False,
54     help="Re-upload the image even if it already exists on the server")
55 keepdocker_parser.add_argument(
56     '--force-image-format', action='store_true', default=False,
57     help="Proceed even if the image format is not supported by the server")
58
59 _group = keepdocker_parser.add_mutually_exclusive_group()
60 _group.add_argument(
61     '--pull', action='store_true', default=False,
62     help="Try to pull the latest image from Docker registry")
63 _group.add_argument(
64     '--no-pull', action='store_false', dest='pull',
65     help="Use locally installed image only, don't pull image from Docker registry (default)")
66
67 # Combine keepdocker options listed above with run_opts options of arv-put.
68 # The options inherited from arv-put include --name, --project-uuid,
69 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
70 arg_parser = argparse.ArgumentParser(
71         description="Upload or list Docker images in Arvados",
72         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
73
74 arg_parser.add_argument(
75     'image', nargs='?',
76     help="Docker image to upload: repo, repo:tag, or hash")
77 arg_parser.add_argument(
78     'tag', nargs='?',
79     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
80
81 class DockerError(Exception):
82     pass
83
84
85 def popen_docker(cmd, *args, **kwargs):
86     manage_stdin = ('stdin' not in kwargs)
87     kwargs.setdefault('stdin', subprocess.PIPE)
88     kwargs.setdefault('stdout', sys.stderr)
89     try:
90         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
91     except OSError:  # No docker.io in $PATH
92         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
93     if manage_stdin:
94         docker_proc.stdin.close()
95     return docker_proc
96
97 def check_docker(proc, description):
98     proc.wait()
99     if proc.returncode != 0:
100         raise DockerError("docker {} returned status code {}".
101                           format(description, proc.returncode))
102
103 def docker_image_format(image_hash):
104     """Return the registry format ('v1' or 'v2') of the given image."""
105     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
106                         stdout=subprocess.PIPE)
107     try:
108         image_id = next(cmd.stdout).decode().strip()
109         if image_id.startswith('sha256:'):
110             return 'v2'
111         elif ':' not in image_id:
112             return 'v1'
113         else:
114             return 'unknown'
115     finally:
116         check_docker(cmd, "inspect")
117
118 def docker_image_compatible(api, image_hash):
119     supported = api._rootDesc.get('dockerImageFormats', [])
120     if not supported:
121         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
122         return False
123
124     fmt = docker_image_format(image_hash)
125     if fmt in supported:
126         return True
127     else:
128         logger.error("image format is {!r} " \
129             "but server supports only {!r}".format(fmt, supported))
130         return False
131
132 def docker_images():
133     # Yield a DockerImage tuple for each installed image.
134     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
135     list_output = iter(list_proc.stdout)
136     next(list_output)  # Ignore the header line
137     for line in list_output:
138         words = line.split()
139         words = [word.decode() for word in words]
140         size_index = len(words) - 2
141         repo, tag, imageid = words[:3]
142         ctime = ' '.join(words[3:size_index])
143         vsize = ' '.join(words[size_index:])
144         yield DockerImage(repo, tag, imageid, ctime, vsize)
145     list_proc.stdout.close()
146     check_docker(list_proc, "images")
147
148 def find_image_hashes(image_search, image_tag=None):
149     # Given one argument, search for Docker images with matching hashes,
150     # and return their full hashes in a set.
151     # Given two arguments, also search for a Docker image with the
152     # same repository and tag.  If one is found, return its hash in a
153     # set; otherwise, fall back to the one-argument hash search.
154     # Returns None if no match is found, or a hash search is ambiguous.
155     hash_search = image_search.lower()
156     hash_matches = set()
157     for image in docker_images():
158         if (image.repo == image_search) and (image.tag == image_tag):
159             return set([image.hash])
160         elif image.hash.startswith(hash_search):
161             hash_matches.add(image.hash)
162     return hash_matches
163
164 def find_one_image_hash(image_search, image_tag=None):
165     hashes = find_image_hashes(image_search, image_tag)
166     hash_count = len(hashes)
167     if hash_count == 1:
168         return hashes.pop()
169     elif hash_count == 0:
170         raise DockerError("no matching image found")
171     else:
172         raise DockerError("{} images match {}".format(hash_count, image_search))
173
174 def stat_cache_name(image_file):
175     return getattr(image_file, 'name', image_file) + '.stat'
176
177 def pull_image(image_name, image_tag):
178     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
179                  "pull")
180
181 def save_image(image_hash, image_file):
182     # Save the specified Docker image to image_file, then try to save its
183     # stats so we can try to resume after interruption.
184     check_docker(popen_docker(['save', image_hash], stdout=image_file),
185                  "save")
186     image_file.flush()
187     try:
188         with open(stat_cache_name(image_file), 'w') as statfile:
189             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
190     except STAT_CACHE_ERRORS:
191         pass  # We won't resume from this cache.  No big deal.
192
193 def get_cache_dir():
194     return arv_cmd.make_home_conf_dir(
195         os.path.join('.cache', 'arvados', 'docker'), 0o700)
196
197 def prep_image_file(filename):
198     # Return a file object ready to save a Docker image,
199     # and a boolean indicating whether or not we need to actually save the
200     # image (False if a cached save is available).
201     cache_dir = get_cache_dir()
202     if cache_dir is None:
203         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
204         need_save = True
205     else:
206         file_path = os.path.join(cache_dir, filename)
207         try:
208             with open(stat_cache_name(file_path)) as statfile:
209                 prev_stat = json.load(statfile)
210             now_stat = os.stat(file_path)
211             need_save = any(prev_stat[field] != now_stat[field]
212                             for field in [ST_MTIME, ST_SIZE])
213         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
214             need_save = True  # We couldn't compare against old stats
215         image_file = open(file_path, 'w+b' if need_save else 'rb')
216     return image_file, need_save
217
218 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
219     link_attrs.update({'link_class': link_class, 'name': link_name})
220     return api_client.links().create(body=link_attrs).execute(
221         num_retries=num_retries)
222
223 def docker_link_sort_key(link):
224     """Build a sort key to find the latest available Docker image.
225
226     To find one source collection for a Docker image referenced by
227     name or image id, the API server looks for a link with the most
228     recent `image_timestamp` property; then the most recent
229     `created_at` timestamp.  This method generates a sort key for
230     Docker metadata links to sort them from least to most preferred.
231     """
232     try:
233         image_timestamp = ciso8601.parse_datetime_as_naive(
234             link['properties']['image_timestamp'])
235     except (KeyError, ValueError):
236         image_timestamp = EARLIEST_DATETIME
237     try:
238         created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
239     except ValueError:
240         created_timestamp = None
241     return (image_timestamp, created_timestamp)
242
243 def _get_docker_links(api_client, num_retries, **kwargs):
244     links = arvados.util.list_all(api_client.links().list,
245                                   num_retries, **kwargs)
246     for link in links:
247         link['_sort_key'] = docker_link_sort_key(link)
248     links.sort(key=itemgetter('_sort_key'), reverse=True)
249     return links
250
251 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
252     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
253     return {
254         '_sort_key': link['_sort_key'],
255         'timestamp': link['_sort_key'][timestamp_index],
256         'collection': link['head_uuid'],
257         'dockerhash': dockerhash,
258         'repo': repo,
259         'tag': tag,
260         }
261
262 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
263     """List all Docker images known to the api_client with image_name and
264     image_tag.  If no image_name is given, defaults to listing all
265     Docker images.
266
267     Returns a list of tuples representing matching Docker images,
268     sorted in preference order (i.e. the first collection in the list
269     is the one that the API server would use). Each tuple is a
270     (collection_uuid, collection_info) pair, where collection_info is
271     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
272
273     """
274     search_filters = []
275     repo_links = None
276     hash_links = None
277     if image_name:
278         # Find images with the name the user specified.
279         search_links = _get_docker_links(
280             api_client, num_retries,
281             filters=[['link_class', '=', 'docker_image_repo+tag'],
282                      ['name', '=',
283                       '{}:{}'.format(image_name, image_tag or 'latest')]])
284         if search_links:
285             repo_links = search_links
286         else:
287             # Fall back to finding images with the specified image hash.
288             search_links = _get_docker_links(
289                 api_client, num_retries,
290                 filters=[['link_class', '=', 'docker_image_hash'],
291                          ['name', 'ilike', image_name + '%']])
292             hash_links = search_links
293         # Only list information about images that were found in the search.
294         search_filters.append(['head_uuid', 'in',
295                                [link['head_uuid'] for link in search_links]])
296
297     # It should be reasonable to expect that each collection only has one
298     # image hash (though there may be many links specifying this).  Find
299     # the API server's most preferred image hash link for each collection.
300     if hash_links is None:
301         hash_links = _get_docker_links(
302             api_client, num_retries,
303             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
304     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
305
306     # Each collection may have more than one name (though again, one name
307     # may be specified more than once).  Build an image listing from name
308     # tags, sorted by API server preference.
309     if repo_links is None:
310         repo_links = _get_docker_links(
311             api_client, num_retries,
312             filters=search_filters + [['link_class', '=',
313                                        'docker_image_repo+tag']])
314     seen_image_names = collections.defaultdict(set)
315     images = []
316     for link in repo_links:
317         collection_uuid = link['head_uuid']
318         if link['name'] in seen_image_names[collection_uuid]:
319             continue
320         seen_image_names[collection_uuid].add(link['name'])
321         try:
322             dockerhash = hash_link_map[collection_uuid]['name']
323         except KeyError:
324             dockerhash = '<unknown>'
325         name_parts = link['name'].split(':', 1)
326         images.append(_new_image_listing(link, dockerhash, *name_parts))
327
328     # Find any image hash links that did not have a corresponding name link,
329     # and add image listings for them, retaining the API server preference
330     # sorting.
331     images_start_size = len(images)
332     for collection_uuid, link in hash_link_map.items():
333         if not seen_image_names[collection_uuid]:
334             images.append(_new_image_listing(link, link['name']))
335     if len(images) > images_start_size:
336         images.sort(key=itemgetter('_sort_key'), reverse=True)
337
338     # Remove any image listings that refer to unknown collections.
339     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
340             api_client.collections().list, num_retries,
341             filters=[['uuid', 'in', [im['collection'] for im in images]]],
342             select=['uuid'])}
343     return [(image['collection'], image) for image in images
344             if image['collection'] in existing_coll_uuids]
345
346 def items_owned_by(owner_uuid, arv_items):
347     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
348
349 def _uuid2pdh(api, uuid):
350     return api.collections().list(
351         filters=[['uuid', '=', uuid]],
352         select=['portable_data_hash'],
353     ).execute()['items'][0]['portable_data_hash']
354
355 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
356     args = arg_parser.parse_args(arguments)
357     if api is None:
358         api = arvados.api('v1')
359
360     if args.image is None or args.image == 'images':
361         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
362         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
363         try:
364             for i, j in list_images_in_arv(api, args.retries):
365                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
366         except IOError as e:
367             if e.errno == errno.EPIPE:
368                 pass
369             else:
370                 raise
371         sys.exit(0)
372
373     if re.search(r':\w[-.\w]{0,127}$', args.image):
374         # image ends with :valid-tag
375         if args.tag is not None:
376             logger.error(
377                 "image %r already includes a tag, cannot add tag argument %r",
378                 args.image, args.tag)
379             sys.exit(1)
380         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
381         args.image, args.tag = args.image.rsplit(':', 1)
382     elif args.tag is None:
383         args.tag = 'latest'
384
385     # Pull the image if requested, unless the image is specified as a hash
386     # that we already have.
387     if args.pull and not find_image_hashes(args.image):
388         pull_image(args.image, args.tag)
389
390     try:
391         image_hash = find_one_image_hash(args.image, args.tag)
392     except DockerError as error:
393         logger.error(error.message)
394         sys.exit(1)
395
396     if not docker_image_compatible(api, image_hash):
397         if args.force_image_format:
398             logger.warning("forcing incompatible image")
399         else:
400             logger.error("refusing to store " \
401                 "incompatible format (use --force-image-format to override)")
402             sys.exit(1)
403
404     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
405
406     if args.name is None:
407         if image_repo_tag:
408             collection_name = 'Docker image {} {}'.format(image_repo_tag, image_hash[0:12])
409         else:
410             collection_name = 'Docker image {}'.format(image_hash[0:12])
411     else:
412         collection_name = args.name
413
414     # Acquire a lock so that only one arv-keepdocker process will
415     # dump/upload a particular docker image at a time.  Do this before
416     # checking if the image already exists in Arvados so that if there
417     # is an upload already underway, when that upload completes and
418     # this process gets a turn, it will discover the Docker image is
419     # already available and exit quickly.
420     outfile_name = '{}.tar'.format(image_hash)
421     lockfile_name = '{}.lock'.format(outfile_name)
422     lockfile = None
423     cache_dir = get_cache_dir()
424     if cache_dir:
425         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
426         fcntl.flock(lockfile, fcntl.LOCK_EX)
427
428     try:
429         if not args.force:
430             # Check if this image is already in Arvados.
431
432             # Project where everything should be owned
433             parent_project_uuid = args.project_uuid or api.users().current().execute(
434                 num_retries=args.retries)['uuid']
435
436             # Find image hash tags
437             existing_links = _get_docker_links(
438                 api, args.retries,
439                 filters=[['link_class', '=', 'docker_image_hash'],
440                          ['name', '=', image_hash]])
441             if existing_links:
442                 # get readable collections
443                 collections = api.collections().list(
444                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
445                     select=["uuid", "owner_uuid", "name", "manifest_text"]
446                     ).execute(num_retries=args.retries)['items']
447
448                 if collections:
449                     # check for repo+tag links on these collections
450                     if image_repo_tag:
451                         existing_repo_tag = _get_docker_links(
452                             api, args.retries,
453                             filters=[['link_class', '=', 'docker_image_repo+tag'],
454                                      ['name', '=', image_repo_tag],
455                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
456                     else:
457                         existing_repo_tag = []
458
459                     try:
460                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
461                     except StopIteration:
462                         # create new collection owned by the project
463                         coll_uuid = api.collections().create(
464                             body={"manifest_text": collections[0]['manifest_text'],
465                                   "name": collection_name,
466                                   "owner_uuid": parent_project_uuid},
467                             ensure_unique_name=True
468                             ).execute(num_retries=args.retries)['uuid']
469
470                     link_base = {'owner_uuid': parent_project_uuid,
471                                  'head_uuid':  coll_uuid,
472                                  'properties': existing_links[0]['properties']}
473
474                     if not any(items_owned_by(parent_project_uuid, existing_links)):
475                         # create image link owned by the project
476                         make_link(api, args.retries,
477                                   'docker_image_hash', image_hash, **link_base)
478
479                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
480                         # create repo+tag link owned by the project
481                         make_link(api, args.retries, 'docker_image_repo+tag',
482                                   image_repo_tag, **link_base)
483
484                     stdout.write(coll_uuid + "\n")
485
486                     sys.exit(0)
487
488         # Open a file for the saved image, and write it if needed.
489         image_file, need_save = prep_image_file(outfile_name)
490         if need_save:
491             save_image(image_hash, image_file)
492
493         # Call arv-put with switches we inherited from it
494         # (a.k.a., switches that aren't our own).
495         if arguments is None:
496             arguments = sys.argv[1:]
497         arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
498         put_args = keepdocker_parser.parse_known_args(arguments)[1]
499
500         if args.name is None:
501             put_args += ['--name', collection_name]
502
503         coll_uuid = arv_put.main(
504             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
505             install_sig_handlers=install_sig_handlers).strip()
506
507         # Read the image metadata and make Arvados links from it.
508         image_file.seek(0)
509         image_tar = tarfile.open(fileobj=image_file)
510         image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
511         if image_hash_type:
512             json_filename = raw_image_hash + '.json'
513         else:
514             json_filename = raw_image_hash + '/json'
515         json_file = image_tar.extractfile(image_tar.getmember(json_filename))
516         image_metadata = json.loads(json_file.read().decode())
517         json_file.close()
518         image_tar.close()
519         link_base = {'head_uuid': coll_uuid, 'properties': {}}
520         if 'created' in image_metadata:
521             link_base['properties']['image_timestamp'] = image_metadata['created']
522         if args.project_uuid is not None:
523             link_base['owner_uuid'] = args.project_uuid
524
525         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
526         if image_repo_tag:
527             make_link(api, args.retries,
528                       'docker_image_repo+tag', image_repo_tag, **link_base)
529
530         # Clean up.
531         image_file.close()
532         for filename in [stat_cache_name(image_file), image_file.name]:
533             try:
534                 os.unlink(filename)
535             except OSError as error:
536                 if error.errno != errno.ENOENT:
537                     raise
538     finally:
539         if lockfile is not None:
540             # Closing the lockfile unlocks it.
541             lockfile.close()
542
543 if __name__ == '__main__':
544     main()