18947: Move arvados-dispatch-slurm into arvados-server binary.
[arvados.git] / sdk / python / arvados / commands / keepdocker.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from builtins import next
6 import argparse
7 import collections
8 import datetime
9 import errno
10 import json
11 import os
12 import re
13 import sys
14 import tarfile
15 import tempfile
16 import shutil
17 import _strptime
18 import fcntl
19 from operator import itemgetter
20 from stat import *
21
22 if os.name == "posix" and sys.version_info[0] < 3:
23     import subprocess32 as subprocess
24 else:
25     import subprocess
26
27 import arvados
28 import arvados.util
29 import arvados.commands._util as arv_cmd
30 import arvados.commands.put as arv_put
31 from arvados.collection import CollectionReader
32 import ciso8601
33 import logging
34 import arvados.config
35
36 from arvados._version import __version__
37
38 logger = logging.getLogger('arvados.keepdocker')
39 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
40                 else logging.INFO)
41
42 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
43 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
44
45 DockerImage = collections.namedtuple(
46     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
47
48 keepdocker_parser = argparse.ArgumentParser(add_help=False)
49 keepdocker_parser.add_argument(
50     '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
51     help='Print version and exit.')
52 keepdocker_parser.add_argument(
53     '-f', '--force', action='store_true', default=False,
54     help="Re-upload the image even if it already exists on the server")
55 keepdocker_parser.add_argument(
56     '--force-image-format', action='store_true', default=False,
57     help="Proceed even if the image format is not supported by the server")
58
59 _group = keepdocker_parser.add_mutually_exclusive_group()
60 _group.add_argument(
61     '--pull', action='store_true', default=False,
62     help="Try to pull the latest image from Docker registry")
63 _group.add_argument(
64     '--no-pull', action='store_false', dest='pull',
65     help="Use locally installed image only, don't pull image from Docker registry (default)")
66
67 # Combine keepdocker options listed above with run_opts options of arv-put.
68 # The options inherited from arv-put include --name, --project-uuid,
69 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
70 arg_parser = argparse.ArgumentParser(
71         description="Upload or list Docker images in Arvados",
72         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
73
74 arg_parser.add_argument(
75     'image', nargs='?',
76     help="Docker image to upload: repo, repo:tag, or hash")
77 arg_parser.add_argument(
78     'tag', nargs='?',
79     help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
80
81 class DockerError(Exception):
82     pass
83
84
85 def popen_docker(cmd, *args, **kwargs):
86     manage_stdin = ('stdin' not in kwargs)
87     kwargs.setdefault('stdin', subprocess.PIPE)
88     kwargs.setdefault('stdout', sys.stderr)
89     try:
90         docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
91     except OSError:  # No docker in $PATH, try docker.io
92         docker_proc = subprocess.Popen(['docker.io'] + cmd, *args, **kwargs)
93     if manage_stdin:
94         docker_proc.stdin.close()
95     return docker_proc
96
97 def check_docker(proc, description):
98     proc.wait()
99     if proc.returncode != 0:
100         raise DockerError("docker {} returned status code {}".
101                           format(description, proc.returncode))
102
103 def docker_image_format(image_hash):
104     """Return the registry format ('v1' or 'v2') of the given image."""
105     cmd = popen_docker(['inspect', '--format={{.Id}}', image_hash],
106                         stdout=subprocess.PIPE)
107     try:
108         image_id = next(cmd.stdout).decode('utf-8').strip()
109         if image_id.startswith('sha256:'):
110             return 'v2'
111         elif ':' not in image_id:
112             return 'v1'
113         else:
114             return 'unknown'
115     finally:
116         check_docker(cmd, "inspect")
117
118 def docker_image_compatible(api, image_hash):
119     supported = api._rootDesc.get('dockerImageFormats', [])
120     if not supported:
121         logger.warning("server does not specify supported image formats (see docker_image_formats in server config).")
122         return False
123
124     fmt = docker_image_format(image_hash)
125     if fmt in supported:
126         return True
127     else:
128         logger.error("image format is {!r} " \
129             "but server supports only {!r}".format(fmt, supported))
130         return False
131
132 def docker_images():
133     # Yield a DockerImage tuple for each installed image.
134     list_proc = popen_docker(['images', '--no-trunc'], stdout=subprocess.PIPE)
135     list_output = iter(list_proc.stdout)
136     next(list_output)  # Ignore the header line
137     for line in list_output:
138         words = line.split()
139         words = [word.decode('utf-8') for word in words]
140         size_index = len(words) - 2
141         repo, tag, imageid = words[:3]
142         ctime = ' '.join(words[3:size_index])
143         vsize = ' '.join(words[size_index:])
144         yield DockerImage(repo, tag, imageid, ctime, vsize)
145     list_proc.stdout.close()
146     check_docker(list_proc, "images")
147
148 def find_image_hashes(image_search, image_tag=None):
149     # Query for a Docker images with the repository and tag and return
150     # the image ids in a list.  Returns empty list if no match is
151     # found.
152
153     list_proc = popen_docker(['inspect', "%s%s" % (image_search, ":"+image_tag if image_tag else "")], stdout=subprocess.PIPE)
154
155     inspect = list_proc.stdout.read()
156     list_proc.stdout.close()
157
158     imageinfo = json.loads(inspect)
159
160     return [i["Id"] for i in imageinfo]
161
162 def find_one_image_hash(image_search, image_tag=None):
163     hashes = find_image_hashes(image_search, image_tag)
164     hash_count = len(hashes)
165     if hash_count == 1:
166         return hashes.pop()
167     elif hash_count == 0:
168         raise DockerError("no matching image found")
169     else:
170         raise DockerError("{} images match {}".format(hash_count, image_search))
171
172 def stat_cache_name(image_file):
173     return getattr(image_file, 'name', image_file) + '.stat'
174
175 def pull_image(image_name, image_tag):
176     check_docker(popen_docker(['pull', '{}:{}'.format(image_name, image_tag)]),
177                  "pull")
178
179 def save_image(image_hash, image_file):
180     # Save the specified Docker image to image_file, then try to save its
181     # stats so we can try to resume after interruption.
182     check_docker(popen_docker(['save', image_hash], stdout=image_file),
183                  "save")
184     image_file.flush()
185     try:
186         with open(stat_cache_name(image_file), 'w') as statfile:
187             json.dump(tuple(os.fstat(image_file.fileno())), statfile)
188     except STAT_CACHE_ERRORS:
189         pass  # We won't resume from this cache.  No big deal.
190
191 def get_cache_dir():
192     return arv_cmd.make_home_conf_dir(
193         os.path.join('.cache', 'arvados', 'docker'), 0o700)
194
195 def prep_image_file(filename):
196     # Return a file object ready to save a Docker image,
197     # and a boolean indicating whether or not we need to actually save the
198     # image (False if a cached save is available).
199     cache_dir = get_cache_dir()
200     if cache_dir is None:
201         image_file = tempfile.NamedTemporaryFile(suffix='.tar')
202         need_save = True
203     else:
204         file_path = os.path.join(cache_dir, filename)
205         try:
206             with open(stat_cache_name(file_path)) as statfile:
207                 prev_stat = json.load(statfile)
208             now_stat = os.stat(file_path)
209             need_save = any(prev_stat[field] != now_stat[field]
210                             for field in [ST_MTIME, ST_SIZE])
211         except STAT_CACHE_ERRORS + (AttributeError, IndexError):
212             need_save = True  # We couldn't compare against old stats
213         image_file = open(file_path, 'w+b' if need_save else 'rb')
214     return image_file, need_save
215
216 def make_link(api_client, num_retries, link_class, link_name, **link_attrs):
217     link_attrs.update({'link_class': link_class, 'name': link_name})
218     return api_client.links().create(body=link_attrs).execute(
219         num_retries=num_retries)
220
221 def docker_link_sort_key(link):
222     """Build a sort key to find the latest available Docker image.
223
224     To find one source collection for a Docker image referenced by
225     name or image id, the API server looks for a link with the most
226     recent `image_timestamp` property; then the most recent
227     `created_at` timestamp.  This method generates a sort key for
228     Docker metadata links to sort them from least to most preferred.
229     """
230     try:
231         image_timestamp = ciso8601.parse_datetime_as_naive(
232             link['properties']['image_timestamp'])
233     except (KeyError, ValueError):
234         image_timestamp = EARLIEST_DATETIME
235     try:
236         created_timestamp = ciso8601.parse_datetime_as_naive(link['created_at'])
237     except ValueError:
238         created_timestamp = None
239     return (image_timestamp, created_timestamp)
240
241 def _get_docker_links(api_client, num_retries, **kwargs):
242     links = arvados.util.list_all(api_client.links().list,
243                                   num_retries, **kwargs)
244     for link in links:
245         link['_sort_key'] = docker_link_sort_key(link)
246     links.sort(key=itemgetter('_sort_key'), reverse=True)
247     return links
248
249 def _new_image_listing(link, dockerhash, repo='<none>', tag='<none>'):
250     timestamp_index = 1 if (link['_sort_key'][0] is EARLIEST_DATETIME) else 0
251     return {
252         '_sort_key': link['_sort_key'],
253         'timestamp': link['_sort_key'][timestamp_index],
254         'collection': link['head_uuid'],
255         'dockerhash': dockerhash,
256         'repo': repo,
257         'tag': tag,
258         }
259
260 def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
261     """List all Docker images known to the api_client with image_name and
262     image_tag.  If no image_name is given, defaults to listing all
263     Docker images.
264
265     Returns a list of tuples representing matching Docker images,
266     sorted in preference order (i.e. the first collection in the list
267     is the one that the API server would use). Each tuple is a
268     (collection_uuid, collection_info) pair, where collection_info is
269     a dict with fields "dockerhash", "repo", "tag", and "timestamp".
270
271     """
272     search_filters = []
273     repo_links = None
274     hash_links = None
275     if image_name:
276         # Find images with the name the user specified.
277         search_links = _get_docker_links(
278             api_client, num_retries,
279             filters=[['link_class', '=', 'docker_image_repo+tag'],
280                      ['name', '=',
281                       '{}:{}'.format(image_name, image_tag or 'latest')]])
282         if search_links:
283             repo_links = search_links
284         else:
285             # Fall back to finding images with the specified image hash.
286             search_links = _get_docker_links(
287                 api_client, num_retries,
288                 filters=[['link_class', '=', 'docker_image_hash'],
289                          ['name', 'ilike', image_name + '%']])
290             hash_links = search_links
291         # Only list information about images that were found in the search.
292         search_filters.append(['head_uuid', 'in',
293                                [link['head_uuid'] for link in search_links]])
294
295     # It should be reasonable to expect that each collection only has one
296     # image hash (though there may be many links specifying this).  Find
297     # the API server's most preferred image hash link for each collection.
298     if hash_links is None:
299         hash_links = _get_docker_links(
300             api_client, num_retries,
301             filters=search_filters + [['link_class', '=', 'docker_image_hash']])
302     hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
303
304     # Each collection may have more than one name (though again, one name
305     # may be specified more than once).  Build an image listing from name
306     # tags, sorted by API server preference.
307     if repo_links is None:
308         repo_links = _get_docker_links(
309             api_client, num_retries,
310             filters=search_filters + [['link_class', '=',
311                                        'docker_image_repo+tag']])
312     seen_image_names = collections.defaultdict(set)
313     images = []
314     for link in repo_links:
315         collection_uuid = link['head_uuid']
316         if link['name'] in seen_image_names[collection_uuid]:
317             continue
318         seen_image_names[collection_uuid].add(link['name'])
319         try:
320             dockerhash = hash_link_map[collection_uuid]['name']
321         except KeyError:
322             dockerhash = '<unknown>'
323         name_parts = link['name'].split(':', 1)
324         images.append(_new_image_listing(link, dockerhash, *name_parts))
325
326     # Find any image hash links that did not have a corresponding name link,
327     # and add image listings for them, retaining the API server preference
328     # sorting.
329     images_start_size = len(images)
330     for collection_uuid, link in hash_link_map.items():
331         if not seen_image_names[collection_uuid]:
332             images.append(_new_image_listing(link, link['name']))
333     if len(images) > images_start_size:
334         images.sort(key=itemgetter('_sort_key'), reverse=True)
335
336     # Remove any image listings that refer to unknown collections.
337     existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
338             api_client.collections().list, num_retries,
339             filters=[['uuid', 'in', [im['collection'] for im in images]]],
340             select=['uuid'])}
341     return [(image['collection'], image) for image in images
342             if image['collection'] in existing_coll_uuids]
343
344 def items_owned_by(owner_uuid, arv_items):
345     return (item for item in arv_items if item['owner_uuid'] == owner_uuid)
346
347 def _uuid2pdh(api, uuid):
348     return api.collections().list(
349         filters=[['uuid', '=', uuid]],
350         select=['portable_data_hash'],
351     ).execute()['items'][0]['portable_data_hash']
352
353 def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
354     args = arg_parser.parse_args(arguments)
355     if api is None:
356         api = arvados.api('v1')
357
358     if args.image is None or args.image == 'images':
359         fmt = "{:30}  {:10}  {:12}  {:29}  {:20}\n"
360         stdout.write(fmt.format("REPOSITORY", "TAG", "IMAGE ID", "COLLECTION", "CREATED"))
361         try:
362             for i, j in list_images_in_arv(api, args.retries):
363                 stdout.write(fmt.format(j["repo"], j["tag"], j["dockerhash"][0:12], i, j["timestamp"].strftime("%c")))
364         except IOError as e:
365             if e.errno == errno.EPIPE:
366                 pass
367             else:
368                 raise
369         sys.exit(0)
370
371     if re.search(r':\w[-.\w]{0,127}$', args.image):
372         # image ends with :valid-tag
373         if args.tag is not None:
374             logger.error(
375                 "image %r already includes a tag, cannot add tag argument %r",
376                 args.image, args.tag)
377             sys.exit(1)
378         # rsplit() accommodates "myrepo.example:8888/repo/image:tag"
379         args.image, args.tag = args.image.rsplit(':', 1)
380     elif args.tag is None:
381         args.tag = 'latest'
382
383     # Pull the image if requested, unless the image is specified as a hash
384     # that we already have.
385     if args.pull and not find_image_hashes(args.image):
386         pull_image(args.image, args.tag)
387
388     try:
389         image_hash = find_one_image_hash(args.image, args.tag)
390     except DockerError as error:
391         logger.error(str(error))
392         sys.exit(1)
393
394     if not docker_image_compatible(api, image_hash):
395         if args.force_image_format:
396             logger.warning("forcing incompatible image")
397         else:
398             logger.error("refusing to store " \
399                 "incompatible format (use --force-image-format to override)")
400             sys.exit(1)
401
402     image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
403
404     if args.name is None:
405         if image_repo_tag:
406             collection_name = 'Docker image {} {}'.format(image_repo_tag.replace("/", " "), image_hash[0:12])
407         else:
408             collection_name = 'Docker image {}'.format(image_hash[0:12])
409     else:
410         collection_name = args.name
411
412     # Acquire a lock so that only one arv-keepdocker process will
413     # dump/upload a particular docker image at a time.  Do this before
414     # checking if the image already exists in Arvados so that if there
415     # is an upload already underway, when that upload completes and
416     # this process gets a turn, it will discover the Docker image is
417     # already available and exit quickly.
418     outfile_name = '{}.tar'.format(image_hash)
419     lockfile_name = '{}.lock'.format(outfile_name)
420     lockfile = None
421     cache_dir = get_cache_dir()
422     if cache_dir:
423         lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
424         fcntl.flock(lockfile, fcntl.LOCK_EX)
425
426     try:
427         if not args.force:
428             # Check if this image is already in Arvados.
429
430             # Project where everything should be owned
431             parent_project_uuid = args.project_uuid or api.users().current().execute(
432                 num_retries=args.retries)['uuid']
433
434             # Find image hash tags
435             existing_links = _get_docker_links(
436                 api, args.retries,
437                 filters=[['link_class', '=', 'docker_image_hash'],
438                          ['name', '=', image_hash]])
439             if existing_links:
440                 # get readable collections
441                 collections = api.collections().list(
442                     filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
443                     select=["uuid", "owner_uuid", "name", "manifest_text"]
444                     ).execute(num_retries=args.retries)['items']
445
446                 if collections:
447                     # check for repo+tag links on these collections
448                     if image_repo_tag:
449                         existing_repo_tag = _get_docker_links(
450                             api, args.retries,
451                             filters=[['link_class', '=', 'docker_image_repo+tag'],
452                                      ['name', '=', image_repo_tag],
453                                      ['head_uuid', 'in', [c["uuid"] for c in collections]]])
454                     else:
455                         existing_repo_tag = []
456
457                     try:
458                         coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
459                     except StopIteration:
460                         # create new collection owned by the project
461                         coll_uuid = api.collections().create(
462                             body={"manifest_text": collections[0]['manifest_text'],
463                                   "name": collection_name,
464                                   "owner_uuid": parent_project_uuid,
465                                   "properties": {"docker-image-repo-tag": image_repo_tag}},
466                             ensure_unique_name=True
467                             ).execute(num_retries=args.retries)['uuid']
468
469                     link_base = {'owner_uuid': parent_project_uuid,
470                                  'head_uuid':  coll_uuid,
471                                  'properties': existing_links[0]['properties']}
472
473                     if not any(items_owned_by(parent_project_uuid, existing_links)):
474                         # create image link owned by the project
475                         make_link(api, args.retries,
476                                   'docker_image_hash', image_hash, **link_base)
477
478                     if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
479                         # create repo+tag link owned by the project
480                         make_link(api, args.retries, 'docker_image_repo+tag',
481                                   image_repo_tag, **link_base)
482
483                     stdout.write(coll_uuid + "\n")
484
485                     sys.exit(0)
486
487         # Open a file for the saved image, and write it if needed.
488         image_file, need_save = prep_image_file(outfile_name)
489         if need_save:
490             save_image(image_hash, image_file)
491
492         # Call arv-put with switches we inherited from it
493         # (a.k.a., switches that aren't our own).
494         if arguments is None:
495             arguments = sys.argv[1:]
496         arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
497         put_args = keepdocker_parser.parse_known_args(arguments)[1]
498
499         # Don't fail when cached manifest is invalid, just ignore the cache.
500         put_args += ['--batch']
501
502         if args.name is None:
503             put_args += ['--name', collection_name]
504
505         coll_uuid = arv_put.main(
506             put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
507             install_sig_handlers=install_sig_handlers).strip()
508
509         # Managed properties could be already set
510         coll_properties = api.collections().get(uuid=coll_uuid).execute(num_retries=args.retries).get('properties', {})
511         coll_properties.update({"docker-image-repo-tag": image_repo_tag})
512
513         api.collections().update(uuid=coll_uuid, body={"properties": coll_properties}).execute(num_retries=args.retries)
514
515         # Read the image metadata and make Arvados links from it.
516         image_file.seek(0)
517         image_tar = tarfile.open(fileobj=image_file)
518         image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
519         if image_hash_type:
520             json_filename = raw_image_hash + '.json'
521         else:
522             json_filename = raw_image_hash + '/json'
523         json_file = image_tar.extractfile(image_tar.getmember(json_filename))
524         image_metadata = json.loads(json_file.read().decode('utf-8'))
525         json_file.close()
526         image_tar.close()
527         link_base = {'head_uuid': coll_uuid, 'properties': {}}
528         if 'created' in image_metadata:
529             link_base['properties']['image_timestamp'] = image_metadata['created']
530         if args.project_uuid is not None:
531             link_base['owner_uuid'] = args.project_uuid
532
533         make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
534         if image_repo_tag:
535             make_link(api, args.retries,
536                       'docker_image_repo+tag', image_repo_tag, **link_base)
537
538         # Clean up.
539         image_file.close()
540         for filename in [stat_cache_name(image_file), image_file.name]:
541             try:
542                 os.unlink(filename)
543             except OSError as error:
544                 if error.errno != errno.ENOENT:
545                     raise
546     finally:
547         if lockfile is not None:
548             # Closing the lockfile unlocks it.
549             lockfile.close()
550
551 if __name__ == '__main__':
552     main()