Change statfile in run.py to be able to raise OSError, and change the mock in test_pa...
[arvados.git] / sdk / python / arvados / commands / migrate19.py
1 from __future__ import print_function
2 from __future__ import division
3 import argparse
4 import time
5 import sys
6 import logging
7 import shutil
8 import tempfile
9 import os
10 import subprocess
11 import re
12
13 import arvados
14 import arvados.commands.keepdocker
15 from arvados._version import __version__
16 from arvados.collection import CollectionReader
17
18 logger = logging.getLogger('arvados.migrate-docker19')
19 logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
20                 else logging.INFO)
21
22 _migration_link_class = 'docker_image_migration'
23 _migration_link_name = 'migrate_1.9_1.10'
24
25 class MigrationFailed(Exception):
26     pass
27
28 def main(arguments=None):
29     """Docker image format migration tool for Arvados.
30
31     This converts Docker images stored in Arvados from image format v1
32     (Docker <= 1.9) to image format v2 (Docker >= 1.10).
33
34     Requires Docker running on the local host.
35
36     Usage:
37
38     1) Run arvados/docker/migrate-docker19/build.sh to create
39     arvados/migrate-docker19 Docker image.
40
41     2) Set ARVADOS_API_HOST and ARVADOS_API_TOKEN to the cluster you want to migrate.
42
43     3) Run arv-migrate-docker19 from the Arvados Python SDK on the host (not in a container).
44
45     This will query Arvados for v1 format Docker images.  For each image that
46     does not already have a corresponding v2 format image (as indicated by a
47     docker_image_migration tag) it will perform the following process:
48
49     i) download the image from Arvados
50     ii) load it into Docker
51     iii) update the Docker version, which updates the image
52     iv) save the v2 format image and upload to Arvados
53     v) create a migration link
54
55     """
56
57     migrate19_parser = argparse.ArgumentParser()
58     migrate19_parser.add_argument(
59         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
60         help='Print version and exit.')
61     migrate19_parser.add_argument(
62         '--verbose', action="store_true", help="Print stdout/stderr even on success")
63     migrate19_parser.add_argument(
64         '--force', action="store_true", help="Try to migrate even if there isn't enough space")
65
66     migrate19_parser.add_argument(
67         '--storage-driver', type=str, default="overlay",
68         help="Docker storage driver, e.g. aufs, overlay, vfs")
69
70     exgroup = migrate19_parser.add_mutually_exclusive_group()
71     exgroup.add_argument(
72         '--dry-run', action='store_true', help="Print number of pending migrations.")
73     exgroup.add_argument(
74         '--print-unmigrated', action='store_true',
75         default=False, help="Print list of images needing migration.")
76
77     migrate19_parser.add_argument('--tempdir', help="Set temporary directory")
78
79     migrate19_parser.add_argument('infile', nargs='?', type=argparse.FileType('r'),
80                                   default=None, help="List of images to be migrated")
81
82     args = migrate19_parser.parse_args(arguments)
83
84     if args.tempdir:
85         tempfile.tempdir = args.tempdir
86
87     if args.verbose:
88         logger.setLevel(logging.DEBUG)
89
90     only_migrate = None
91     if args.infile:
92         only_migrate = set()
93         for l in args.infile:
94             only_migrate.add(l.strip())
95
96     api_client  = arvados.api()
97
98     user = api_client.users().current().execute()
99     if not user['is_admin']:
100         raise Exception("This command requires an admin token")
101     sys_uuid = user['uuid'][:12] + '000000000000000'
102
103     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3)
104
105     is_new = lambda img: img['dockerhash'].startswith('sha256:')
106
107     count_new = 0
108     old_images = []
109     for uuid, img in images:
110         if img["dockerhash"].startswith("sha256:"):
111             continue
112         key = (img["repo"], img["tag"], img["timestamp"])
113         old_images.append(img)
114
115     migration_links = arvados.util.list_all(api_client.links().list, filters=[
116         ['link_class', '=', _migration_link_class],
117         ['name', '=', _migration_link_name],
118     ])
119
120     already_migrated = set()
121     for m in migration_links:
122         already_migrated.add(m["tail_uuid"])
123
124     items = arvados.util.list_all(api_client.collections().list,
125                                   filters=[["uuid", "in", [img["collection"] for img in old_images]]],
126                                   select=["uuid", "portable_data_hash", "manifest_text", "owner_uuid"])
127     uuid_to_collection = {i["uuid"]: i for i in items}
128
129     need_migrate = {}
130     totalbytes = 0
131     biggest = 0
132     biggest_pdh = None
133     for img in old_images:
134         i = uuid_to_collection[img["collection"]]
135         pdh = i["portable_data_hash"]
136         if pdh not in already_migrated and pdh not in need_migrate and (only_migrate is None or pdh in only_migrate):
137             need_migrate[pdh] = img
138             with CollectionReader(i["manifest_text"]) as c:
139                 size = list(c.values())[0].size()
140                 if size > biggest:
141                     biggest = size
142                     biggest_pdh = pdh
143                 totalbytes += size
144
145
146     if args.storage_driver == "vfs":
147         will_need = (biggest*20)
148     else:
149         will_need = (biggest*2.5)
150
151     if args.print_unmigrated:
152         only_migrate = set()
153         for pdh in need_migrate:
154             print(pdh)
155         return
156
157     logger.info("Already migrated %i images", len(already_migrated))
158     logger.info("Need to migrate %i images", len(need_migrate))
159     logger.info("Using tempdir %s", tempfile.gettempdir())
160     logger.info("Biggest image %s is about %i MiB", biggest_pdh, biggest>>20)
161     logger.info("Total data to migrate about %i MiB", totalbytes>>20)
162
163     df_out = subprocess.check_output(["df", "-B1", tempfile.gettempdir()])
164     ln = df_out.splitlines()[1]
165     filesystem, blocks, used, available, use_pct, mounted = re.match(r"^([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+)", ln).groups(1)
166     if int(available) <= will_need:
167         logger.warn("Temp filesystem mounted at %s does not have enough space for biggest image (has %i MiB, needs %i MiB)", mounted, int(available)>>20, will_need>>20)
168         if not args.force:
169             exit(1)
170         else:
171             logger.warn("--force provided, will migrate anyway")
172
173     if args.dry_run:
174         return
175
176     success = []
177     failures = []
178     count = 1
179     for old_image in list(need_migrate.values()):
180         if uuid_to_collection[old_image["collection"]]["portable_data_hash"] in already_migrated:
181             continue
182
183         oldcol = CollectionReader(uuid_to_collection[old_image["collection"]]["manifest_text"])
184         tarfile = list(oldcol.keys())[0]
185
186         logger.info("[%i/%i] Migrating %s:%s (%s) (%i MiB)", count, len(need_migrate), old_image["repo"],
187                     old_image["tag"], old_image["collection"], list(oldcol.values())[0].size()>>20)
188         count += 1
189         start = time.time()
190
191         varlibdocker = tempfile.mkdtemp()
192         dockercache = tempfile.mkdtemp()
193         try:
194             with tempfile.NamedTemporaryFile() as envfile:
195                 envfile.write("ARVADOS_API_HOST=%s\n" % (arvados.config.get("ARVADOS_API_HOST")))
196                 envfile.write("ARVADOS_API_TOKEN=%s\n" % (arvados.config.get("ARVADOS_API_TOKEN")))
197                 if arvados.config.get("ARVADOS_API_HOST_INSECURE"):
198                     envfile.write("ARVADOS_API_HOST_INSECURE=%s\n" % (arvados.config.get("ARVADOS_API_HOST_INSECURE")))
199                 envfile.flush()
200
201                 dockercmd = ["docker", "run",
202                              "--privileged",
203                              "--rm",
204                              "--env-file", envfile.name,
205                              "--volume", "%s:/var/lib/docker" % varlibdocker,
206                              "--volume", "%s:/root/.cache/arvados/docker" % dockercache,
207                              "arvados/migrate-docker19:1.0",
208                              "/root/migrate.sh",
209                              "%s/%s" % (old_image["collection"], tarfile),
210                              tarfile[0:40],
211                              old_image["repo"],
212                              old_image["tag"],
213                              uuid_to_collection[old_image["collection"]]["owner_uuid"],
214                              args.storage_driver]
215
216                 proc = subprocess.Popen(dockercmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
217                 out, err = proc.communicate()
218
219                 initial_space = re.search(r"Initial available space is (\d+)", out)
220                 imgload_space = re.search(r"Available space after image load is (\d+)", out)
221                 imgupgrade_space = re.search(r"Available space after image upgrade is (\d+)", out)
222                 keepdocker_space = re.search(r"Available space after arv-keepdocker is (\d+)", out)
223                 cleanup_space = re.search(r"Available space after cleanup is (\d+)", out)
224
225                 if initial_space:
226                     isp = int(initial_space.group(1))
227                     logger.info("Available space initially: %i MiB", (isp)/(2**20))
228                     if imgload_space:
229                         sp = int(imgload_space.group(1))
230                         logger.debug("Used after load: %i MiB", (isp-sp)/(2**20))
231                     if imgupgrade_space:
232                         sp = int(imgupgrade_space.group(1))
233                         logger.debug("Used after upgrade: %i MiB", (isp-sp)/(2**20))
234                     if keepdocker_space:
235                         sp = int(keepdocker_space.group(1))
236                         logger.info("Used after upload: %i MiB", (isp-sp)/(2**20))
237
238                 if cleanup_space:
239                     sp = int(cleanup_space.group(1))
240                     logger.debug("Available after cleanup: %i MiB", (sp)/(2**20))
241
242                 if proc.returncode != 0:
243                     logger.error("Failed with return code %i", proc.returncode)
244                     logger.error("--- Stdout ---\n%s", out)
245                     logger.error("--- Stderr ---\n%s", err)
246                     raise MigrationFailed()
247
248                 if args.verbose:
249                     logger.info("--- Stdout ---\n%s", out)
250                     logger.info("--- Stderr ---\n%s", err)
251
252             migrated = re.search(r"Migrated uuid is ([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15})", out)
253             if migrated:
254                 newcol = CollectionReader(migrated.group(1))
255
256                 api_client.links().create(body={"link": {
257                     'owner_uuid': sys_uuid,
258                     'link_class': _migration_link_class,
259                     'name': _migration_link_name,
260                     'tail_uuid': oldcol.portable_data_hash(),
261                     'head_uuid': newcol.portable_data_hash()
262                     }}).execute(num_retries=3)
263
264                 logger.info("Migrated '%s' (%s) to '%s' (%s) in %is",
265                             oldcol.portable_data_hash(), old_image["collection"],
266                             newcol.portable_data_hash(), migrated.group(1),
267                             time.time() - start)
268                 already_migrated.add(oldcol.portable_data_hash())
269                 success.append(old_image["collection"])
270             else:
271                 logger.error("Error migrating '%s'", old_image["collection"])
272                 failures.append(old_image["collection"])
273         except Exception as e:
274             logger.error("Failed to migrate %s in %is", old_image["collection"], time.time() - start,
275                          exc_info=(not isinstance(e, MigrationFailed)))
276             failures.append(old_image["collection"])
277         finally:
278             shutil.rmtree(varlibdocker)
279             shutil.rmtree(dockercache)
280
281     logger.info("Successfully migrated %i images", len(success))
282     if failures:
283         logger.error("Failed to migrate %i images", len(failures))