20937: Improve error handling
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import sys
14 import tarfile
15 import tempfile
16 import shutil
17 import _strptime
18 import fcntl
19 from operator import itemgetter
20 from stat import *
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 import arvados
28 import arvados.util
29 import arvados.commands._util as arv_cmd
30 import arvados.commands.put as arv_put
31 from arvados.collection import CollectionReader
32 import ciso8601
33 import logging
34 import arvados.config
35
36 from arvados._version import __version__
37
38 logger = logging.getLogger('arvados.keepdocker')
39 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
40                 else logging.INFO)
41
42 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
43 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
44
45 DockerImage = collections.namedtuple(
46     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
47
48 keepdocker_parser = argparse.ArgumentParser(add_help=False)
49 keepdocker_parser.add_argument(
50     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
51     help='Print version and exit.')
52 keepdocker_parser.add_argument(
53     '-f', '--force', action='store_true', default=False,
54     help="Re-upload the image even if it already exists on the server")
55 keepdocker_parser.add_argument(
56     '--force-image-format', action='store_true', default=False,
57     help="Proceed even if the image format is not supported by the server")
58
59 _group = keepdocker_parser.add_mutually_exclusive_group()
60 _group.add_argument(
61     '--pull', action='store_true', default=False,
62     help="Try to pull the latest image from Docker registry")
63 _group.add_argument(
64     '--no-pull', action='store_false', dest='pull',
65     help="Use locally installed image only, don't pull image from Docker registry (default)")
66
67 # Combine keepdocker options listed above with run_opts options of arv-put.
68 # The options inherited from arv-put include --name, --project-uuid,
69 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
70 arg_parser = argparse.ArgumentParser(
71         description="Upload or list Docker images in Arvados",
72         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
73
74 arg_parser.add_argument(
75     'image', nargs='?',
76     help="Docker image to upload: repo, repo:tag, or hash")
77 arg_parser.add_argument(
78     'tag', nargs='?',
79     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
80
81 class DockerError(Exception):
82     pass
83
84
85 def popen_docker(cmd, *args, **kwargs):
86     manage_stdin = ('stdin' not in kwargs)
87     kwargs.setdefault('stdin', subprocess.PIPE)
88     kwargs.setdefault('stdout', subprocess.PIPE)
89     kwargs.setdefault('stderr', subprocess.PIPE)
90     try:
91         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
92     except OSError:  # No docker in $PATH, try docker.io
93         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
94     if manage_stdin:
95         docker_proc.stdin.close()
96     return docker_proc
97
98 def check_docker(proc, description):
99     proc.wait()
100     if proc.returncode != 0:
101         raise DockerError("docker {} returned status code {}".
102                           format(description, proc.returncode))
103
104 def docker_image_format(image_hash):
105     """Return the registry format ('v1' or 'v2') of the given image."""
106     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
107                         stdout=subprocess.PIPE)
108     try:
109         image_id = next(cmd.stdout).decode('utf-8').strip()
110         if image_id.startswith('sha256:'):
111             return 'v2'
112         elif ':' not in image_id:
113             return 'v1'
114         else:
115             return 'unknown'
116     finally:
117         check_docker(cmd, "inspect")
118
119 def docker_image_compatible(api, image_hash):
120     supported = api._rootDesc.get('dockerImageFormats', [])
121     if not supported:
122         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
123         return False
124
125     fmt = docker_image_format(image_hash)
126     if fmt in supported:
127         return True
128     else:
129         logger.error("image format is {!r} " \
130             "but server supports only {!r}".format(fmt, supported))
131         return False
132
133 def docker_images():
134     # Yield a DockerImage tuple for each installed image.
135     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
136     list_output = iter(list_proc.stdout)
137     next(list_output)  # Ignore the header line
138     for line in list_output:
139         words = line.split()
140         words = [word.decode('utf-8') for word in words]
141         size_index = len(words) - 2
142         repo, tag, imageid = words[:3]
143         ctime = ' '.join(words[3:size_index])
144         vsize = ' '.join(words[size_index:])
145         yield DockerImage(repo, tag, imageid, ctime, vsize)
146     list_proc.stdout.close()
147     check_docker(list_proc, "images")
148
149 def find_image_hashes(image_search, image_tag=None):
150     # Query for a Docker images with the repository and tag and return
151     # the image ids in a list.  Returns empty list if no match is
152     # found.
153
154     list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE)
155
156     inspect = list_proc.stdout.read()
157     list_proc.stdout.close()
158
159     imageinfo = json.loads(inspect)
160
161     return [i["Id"] for i in imageinfo]
162
163 def find_one_image_hash(image_search, image_tag=None):
164     hashes = find_image_hashes(image_search, image_tag)
165     hash_count = len(hashes)
166     if hash_count == 1:
167         return hashes.pop()
168     elif hash_count == 0:
169         raise DockerError("no matching image found")
170     else:
171         raise DockerError("{} images match {}".format(hash_count, image_search))
172
173 def stat_cache_name(image_file):
174     return getattr(image_file, 'name', image_file) + '.stat'
175
176 def pull_image(image_name, image_tag):
177     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
178                  "pull")
179
180 def save_image(image_hash, image_file):
181     # Save the specified Docker image to image_file, then try to save its
182     # stats so we can try to resume after interruption.
183     check_docker(popen_docker(['save', image_hash], stdout=image_file),
184                  "save")
185     image_file.flush()
186     try:
187         with open(stat_cache_name(image_file), 'w') as statfile:
188             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
189     except STAT_CACHE_ERRORS:
190         pass  # We won't resume from this cache.  No big deal.
191
192 def get_cache_dir():
193     return arv_cmd.make_home_conf_dir(
194         os.path.join('.cache', 'arvados', 'docker'), 0o700)
195
196 def prep_image_file(filename):
197     # Return a file object ready to save a Docker image,
198     # and a boolean indicating whether or not we need to actually save the
199     # image (False if a cached save is available).
200     cache_dir = get_cache_dir()
201     if cache_dir is None:
202         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
203         need_save = True
204     else:
205         file_path = os.path.join(cache_dir, filename)
206         try:
207             with open(stat_cache_name(file_path)) as statfile:
208                 prev_stat = json.load(statfile)
209             now_stat = os.stat(file_path)
210             need_save = any(prev_stat[field] != now_stat[field]
211                             for field in [ST_MTIME, ST_SIZE])
212         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
213             need_save = True  # We couldn't compare against old stats
214         image_file = open(file_path, 'w+b' if need_save else 'rb')
215     return image_file, need_save
216
217 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
218     link_attrs.update({'link_class': link_class, 'name': link_name})
219     return api_client.links().create(body=link_attrs).execute(
220         num_retries=num_retries)
221
222 def docker_link_sort_key(link):
223     """Build a sort key to find the latest available Docker image.
224
225     To find one source collection for a Docker image referenced by
226     name or image id, the API server looks for a link with the most
227     recent `image_timestamp` property; then the most recent
228     `created_at` timestamp.  This method generates a sort key for
229     Docker metadata links to sort them from least to most preferred.
230     """
231     try:
232         image_timestamp = ciso8601.parse_datetime_as_naive(
233             link['properties']['image_timestamp'])
234     except (KeyError, ValueError):
235         image_timestamp = EARLIEST_DATETIME
236     try:
237         created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
238     except ValueError:
239         created_timestamp = None
240     return (image_timestamp, created_timestamp)
241
242 def _get_docker_links(api_client, num_retries, **kwargs):
243     links = list(arvados.util.keyset_list_all(
244         api_client.links().list, num_retries=num_retries, **kwargs,
245     ))
246     for link in links:
247         link['_sort_key'] = docker_link_sort_key(link)
248     links.sort(key=itemgetter('_sort_key'), reverse=True)
249     return links
250
251 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
252     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
253     return {
254         '_sort_key': link['_sort_key'],
255         'timestamp': link['_sort_key'][timestamp_index],
256         'collection': link['head_uuid'],
257         'dockerhash': dockerhash,
258         'repo': repo,
259         'tag': tag,
260         }
261
262 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
263     """List all Docker images known to the api_client with image_name and
264     image_tag.  If no image_name is given, defaults to listing all
265     Docker images.
266
267     Returns a list of tuples representing matching Docker images,
268     sorted in preference order (i.e. the first collection in the list
269     is the one that the API server would use). Each tuple is a
270     (collection_uuid, collection_info) pair, where collection_info is
271     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
272
273     """
274     search_filters = []
275     repo_links = None
276     hash_links = None
277
278     project_filter = []
279     if project_uuid is not None:
280         project_filter = [["owner_uuid", "=", project_uuid]]
281
282     if image_name:
283         # Find images with the name the user specified.
284         search_links = _get_docker_links(
285             api_client, num_retries,
286             filters=[['link_class', '=', 'docker_image_repo+tag'],
287                      ['name', '=',
288                       '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
289         if search_links:
290             repo_links = search_links
291         else:
292             # Fall back to finding images with the specified image hash.
293             search_links = _get_docker_links(
294                 api_client, num_retries,
295                 filters=[['link_class', '=', 'docker_image_hash'],
296                          ['name', 'ilike', image_name + '%']]+project_filter)
297             hash_links = search_links
298         # Only list information about images that were found in the search.
299         search_filters.append(['head_uuid', 'in',
300                                [link['head_uuid'] for link in search_links]])
301
302     # It should be reasonable to expect that each collection only has one
303     # image hash (though there may be many links specifying this).  Find
304     # the API server's most preferred image hash link for each collection.
305     if hash_links is None:
306         hash_links = _get_docker_links(
307             api_client, num_retries,
308             filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
309     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
310
311     # Each collection may have more than one name (though again, one name
312     # may be specified more than once).  Build an image listing from name
313     # tags, sorted by API server preference.
314     if repo_links is None:
315         repo_links = _get_docker_links(
316             api_client, num_retries,
317             filters=search_filters + [['link_class', '=',
318                                        'docker_image_repo+tag']]+project_filter)
319     seen_image_names = collections.defaultdict(set)
320     images = []
321     for link in repo_links:
322         collection_uuid = link['head_uuid']
323         if link['name'] in seen_image_names[collection_uuid]:
324             continue
325         seen_image_names[collection_uuid].add(link['name'])
326         try:
327             dockerhash = hash_link_map[collection_uuid]['name']
328         except KeyError:
329             dockerhash = '<unknown>'
330         name_parts = link['name'].rsplit(':', 1)
331         images.append(_new_image_listing(link, dockerhash, *name_parts))
332
333     # Find any image hash links that did not have a corresponding name link,
334     # and add image listings for them, retaining the API server preference
335     # sorting.
336     images_start_size = len(images)
337     for collection_uuid, link in hash_link_map.items():
338         if not seen_image_names[collection_uuid]:
339             images.append(_new_image_listing(link, link['name']))
340     if len(images) > images_start_size:
341         images.sort(key=itemgetter('_sort_key'), reverse=True)
342
343     # Remove any image listings that refer to unknown collections.
344     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all(
345         api_client.collections().list,
346         num_retries=num_retries,
347         filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
348         select=['uuid'],
349     )}
350     return [(image['collection'], image) for image in images
351             if image['collection'] in existing_coll_uuids]
352
353 def items_owned_by(owner_uuid, arv_items):
354     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
355
356 def _uuid2pdh(api, uuid):
357     return api.collections().list(
358         filters=[['uuid', '=', uuid]],
359         select=['portable_data_hash'],
360     ).execute()['items'][0]['portable_data_hash']
361
362 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
363     args = arg_parser.parse_args(arguments)
364     if api is None:
365         api = arvados.api('v1', num_retries=args.retries)
366
367     if args.image is None or args.image == 'images':
368         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
369         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
370         try:
371             for i, j in list_images_in_arv(api, args.retries):
372                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
373         except IOError as e:
374             if e.errno == errno.EPIPE:
375                 pass
376             else:
377                 raise
378         sys.exit(0)
379
380     if re.search(r':\w[-.\w]{0,127}$', args.image):
381         # image ends with :valid-tag
382         if args.tag is not None:
383             logger.error(
384                 "image %r already includes a tag, cannot add tag argument %r",
385                 args.image, args.tag)
386             sys.exit(1)
387         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
388         args.image, args.tag = args.image.rsplit(':', 1)
389     elif args.tag is None:
390         args.tag = 'latest'
391
392     if '/' in args.image:
393         hostport, path = args.image.split('/', 1)
394         if hostport.endswith(':443'):
395             # "docker pull host:443/asdf" transparently removes the
396             # :443 (which is redundant because https is implied) and
397             # after it succeeds "docker images" will list "host/asdf",
398             # not "host:443/asdf".  If we strip the :443 then the name
399             # doesn't change underneath us.
400             args.image = '/'.join([hostport[:-4], path])
401
402     # Pull the image if requested, unless the image is specified as a hash
403     # that we already have.
404     if args.pull and not find_image_hashes(args.image):
405         pull_image(args.image, args.tag)
406
407     images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
408
409     image_hash = None
410     try:
411         image_hash = find_one_image_hash(args.image, args.tag)
412         if not docker_image_compatible(api, image_hash):
413             if args.force_image_format:
414                 logger.warning("forcing incompatible image")
415             else:
416                 logger.error("refusing to store " \
417                     "incompatible format (use --force-image-format to override)")
418                 sys.exit(1)
419     except DockerError as error:
420         if images_in_arv:
421             # We don't have Docker / we don't have the image locally,
422             # use image that's already uploaded to Arvados
423             image_hash = images_in_arv[0][1]['dockerhash']
424         else:
425             logger.error(str(error))
426             sys.exit(1)
427
428     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
429
430     if args.name is None:
431         if image_repo_tag:
432             collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12])
433         else:
434             collection_name = 'Docker image {}'.format(image_hash[0:12])
435     else:
436         collection_name = args.name
437
438     # Acquire a lock so that only one arv-keepdocker process will
439     # dump/upload a particular docker image at a time.  Do this before
440     # checking if the image already exists in Arvados so that if there
441     # is an upload already underway, when that upload completes and
442     # this process gets a turn, it will discover the Docker image is
443     # already available and exit quickly.
444     outfile_name = '{}.tar'.format(image_hash)
445     lockfile_name = '{}.lock'.format(outfile_name)
446     lockfile = None
447     cache_dir = get_cache_dir()
448     if cache_dir:
449         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
450         fcntl.flock(lockfile, fcntl.LOCK_EX)
451
452     try:
453         if not args.force:
454             # Check if this image is already in Arvados.
455
456             # Project where everything should be owned
457             parent_project_uuid = args.project_uuid or api.users().current().execute(
458                 num_retries=args.retries)['uuid']
459
460             # Find image hash tags
461             existing_links = _get_docker_links(
462                 api, args.retries,
463                 filters=[['link_class', '=', 'docker_image_hash'],
464                          ['name', '=', image_hash]])
465             if existing_links:
466                 # get readable collections
467                 collections = api.collections().list(
468                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
469                     select=["uuid", "owner_uuid", "name", "manifest_text"]
470                     ).execute(num_retries=args.retries)['items']
471
472                 if collections:
473                     # check for repo+tag links on these collections
474                     if image_repo_tag:
475                         existing_repo_tag = _get_docker_links(
476                             api, args.retries,
477                             filters=[['link_class', '=', 'docker_image_repo+tag'],
478                                      ['name', '=', image_repo_tag],
479                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
480                     else:
481                         existing_repo_tag = []
482
483                     try:
484                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
485                     except StopIteration:
486                         # create new collection owned by the project
487                         coll_uuid = api.collections().create(
488                             body={"manifest_text": collections[0]['manifest_text'],
489                                   "name": collection_name,
490                                   "owner_uuid": parent_project_uuid,
491                                   "properties": {"docker-image-repo-tag": image_repo_tag}},
492                             ensure_unique_name=True
493                             ).execute(num_retries=args.retries)['uuid']
494
495                     link_base = {'owner_uuid': parent_project_uuid,
496                                  'head_uuid':  coll_uuid,
497                                  'properties': existing_links[0]['properties']}
498
499                     if not any(items_owned_by(parent_project_uuid, existing_links)):
500                         # create image link owned by the project
501                         make_link(api, args.retries,
502                                   'docker_image_hash', image_hash, **link_base)
503
504                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
505                         # create repo+tag link owned by the project
506                         make_link(api, args.retries, 'docker_image_repo+tag',
507                                   image_repo_tag, **link_base)
508
509                     stdout.write(coll_uuid + "\n")
510
511                     sys.exit(0)
512
513         # Open a file for the saved image, and write it if needed.
514         image_file, need_save = prep_image_file(outfile_name)
515         if need_save:
516             save_image(image_hash, image_file)
517
518         # Call arv-put with switches we inherited from it
519         # (a.k.a., switches that aren't our own).
520         if arguments is None:
521             arguments = sys.argv[1:]
522         arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
523         put_args = keepdocker_parser.parse_known_args(arguments)[1]
524
525         # Don't fail when cached manifest is invalid, just ignore the cache.
526         put_args += ['--batch']
527
528         if args.name is None:
529             put_args += ['--name', collection_name]
530
531         coll_uuid = arv_put.main(
532             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
533             install_sig_handlers=install_sig_handlers).strip()
534
535         # Managed properties could be already set
536         coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {})
537         coll_properties.update({"docker-image-repo-tag": image_repo_tag})
538
539         api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries)
540
541         # Read the image metadata and make Arvados links from it.
542         image_file.seek(0)
543         image_tar = tarfile.open(fileobj=image_file)
544         image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
545         if image_hash_type:
546             json_filename = raw_image_hash + '.json'
547         else:
548             json_filename = raw_image_hash + '/json'
549         json_file = image_tar.extractfile(image_tar.getmember(json_filename))
550         image_metadata = json.loads(json_file.read().decode('utf-8'))
551         json_file.close()
552         image_tar.close()
553         link_base = {'head_uuid': coll_uuid, 'properties': {}}
554         if 'created' in image_metadata:
555             link_base['properties']['image_timestamp'] = image_metadata['created']
556         if args.project_uuid is not None:
557             link_base['owner_uuid'] = args.project_uuid
558
559         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
560         if image_repo_tag:
561             make_link(api, args.retries,
562                       'docker_image_repo+tag', image_repo_tag, **link_base)
563
564         # Clean up.
565         image_file.close()
566         for filename in [stat_cache_name(image_file), image_file.name]:
567             try:
568                 os.unlink(filename)
569             except OSError as error:
570                 if error.errno != errno.ENOENT:
571                     raise
572     finally:
573         if lockfile is not None:
574             # Closing the lockfile unlocks it.
575             lockfile.close()
576
577 if __name__ == '__main__':
578     main()