Merge branch 'master' into 3699-arv-copy
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31
32 logger = logging.getLogger('arvados.arv-copy')
33
34 # local_repo_dir records which git repositories from the Arvados source
35 # instance have been checked out locally during this run, and to which
36 # directories.
37 # e.g. if repository 'twp' from src_arv has been cloned into
38 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
39 #
40 local_repo_dir = {}
41
42 def main():
43     parser = argparse.ArgumentParser(
44         description='Copy a pipeline instance from one Arvados instance to another.')
45
46     parser.add_argument(
47         '-v', '--verbose', dest='verbose', action='store_true',
48         help='Verbose output.')
49     parser.add_argument(
50         '--src', dest='source_arvados', required=True,
51         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
52     parser.add_argument(
53         '--dst', dest='destination_arvados', required=True,
54         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
55     parser.add_argument(
56         '--recursive', dest='recursive', action='store_true',
57         help='Recursively copy any dependencies for this object. (default)')
58     parser.add_argument(
59         '--no-recursive', dest='recursive', action='store_false',
60         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
61     parser.add_argument(
62         '--dst-git-repo', dest='dst_git_repo',
63         help='The name of the destination git repository. Required when copying a pipeline recursively.')
64     parser.add_argument(
65         '--project-uuid', dest='project_uuid',
66         help='The UUID of the project at the destination to which the pipeline should be copied.')
67     parser.add_argument(
68         'object_uuid',
69         help='The UUID of the object to be copied.')
70     parser.set_defaults(recursive=True)
71
72     args = parser.parse_args()
73
74     if args.verbose:
75         logger.setLevel(logging.DEBUG)
76     else:
77         logger.setLevel(logging.INFO)
78
79     # Create API clients for the source and destination instances
80     src_arv = api_for_instance(args.source_arvados)
81     dst_arv = api_for_instance(args.destination_arvados)
82
83     # Identify the kind of object we have been given, and begin copying.
84     t = uuid_type(src_arv, args.object_uuid)
85     if t == 'Collection':
86         result = copy_collection(args.object_uuid, src=src_arv, dst=dst_arv)
87     elif t == 'PipelineInstance':
88         result = copy_pipeline_instance(args.object_uuid,
89                                         src_arv, dst_arv,
90                                         args.dst_git_repo,
91                                         dst_project=args.project_uuid,
92                                         recursive=args.recursive)
93     elif t == 'PipelineTemplate':
94         result = copy_pipeline_template(args.object_uuid,
95                                         src_arv, dst_arv,
96                                         args.dst_git_repo,
97                                         recursive=args.recursive)
98     else:
99         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
100
101     # Clean up any outstanding temp git repositories.
102     for d in local_repo_dir.values():
103         shutil.rmtree(d, ignore_errors=True)
104
105     # If no exception was thrown and the response does not have an
106     # error_token field, presume success
107     if 'error_token' in result or 'uuid' not in result:
108         logger.error("API server returned an error result: {}".format(result))
109         exit(1)
110
111     logger.info("")
112     logger.info("Success: created copy with uuid {}".format(result['uuid']))
113     exit(0)
114
115 # api_for_instance(instance_name)
116 #
117 #     Creates an API client for the Arvados instance identified by
118 #     instance_name.
119 #
120 #     If instance_name contains a slash, it is presumed to be a path
121 #     (either local or absolute) to a file with Arvados configuration
122 #     settings.
123 #
124 #     Otherwise, it is presumed to be the name of a file in
125 #     $HOME/.config/arvados/instance_name.conf
126 #
127 def api_for_instance(instance_name):
128     if '/' in instance_name:
129         config_file = instance_name
130     else:
131         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
132
133     try:
134         cfg = arvados.config.load(config_file)
135     except (IOError, OSError) as e:
136         abort(("Could not open config file {}: {}\n" +
137                "You must make sure that your configuration tokens\n" +
138                "for Arvados instance {} are in {} and that this\n" +
139                "file is readable.").format(
140                    config_file, e, instance_name, config_file))
141
142     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
143         api_is_insecure = (
144             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
145                 ['1', 't', 'true', 'y', 'yes']))
146         client = arvados.api('v1',
147                              host=cfg['ARVADOS_API_HOST'],
148                              token=cfg['ARVADOS_API_TOKEN'],
149                              insecure=api_is_insecure,
150                              cache=False)
151     else:
152         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
153     return client
154
155 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
156 #
157 #    Copies a pipeline instance identified by pi_uuid from src to dst.
158 #
159 #    If the 'recursive' option evaluates to True:
160 #      1. Copies all input collections
161 #           * For each component in the pipeline, include all collections
162 #             listed as job dependencies for that component)
163 #      2. Copy docker images
164 #      3. Copy git repositories
165 #      4. Copy the pipeline template
166 #
167 #    The only changes made to the copied pipeline instance are:
168 #      1. The original pipeline instance UUID is preserved in
169 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
170 #      2. The pipeline_template_uuid is changed to the new template uuid.
171 #      3. The owner_uuid of the instance is changed to the user who
172 #         copied it.
173 #
174 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True):
175     # Fetch the pipeline instance record.
176     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
177
178     if recursive:
179         if not dst_git_repo:
180             abort('--dst-git-repo is required when copying a pipeline recursively.')
181         # Copy the pipeline template and save the copied template.
182         if pi.get('pipeline_template_uuid', None):
183             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
184                                         src, dst,
185                                         dst_git_repo,
186                                         recursive=True)
187
188         # Copy input collections, docker images and git repos.
189         pi = copy_collections(pi, src, dst)
190         copy_git_repos(pi, src, dst, dst_git_repo)
191
192         # Update the fields of the pipeline instance with the copied
193         # pipeline template.
194         if pi.get('pipeline_template_uuid', None):
195             pi['pipeline_template_uuid'] = pt['uuid']
196
197     else:
198         # not recursive
199         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
200         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
201
202     # Update the pipeline instance properties, and create the new
203     # instance at dst.
204     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
205     if dst_project:
206         pi['owner_uuid'] = dst_project
207     else:
208         del pi['owner_uuid']
209     del pi['uuid']
210     pi['ensure_unique_name'] = True
211
212     new_pi = dst.pipeline_instances().create(body=pi).execute()
213     return new_pi
214
215 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
216 #
217 #    Copies a pipeline template identified by pt_uuid from src to dst.
218 #
219 #    If the 'recursive' option evaluates to true, also copy any collections,
220 #    docker images and git repositories that this template references.
221 #
222 #    The owner_uuid of the new template is changed to that of the user
223 #    who copied the template.
224 #
225 #    Returns the copied pipeline template object.
226 #
227 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True):
228     # fetch the pipeline template from the source instance
229     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
230
231     if recursive:
232         if not dst_git_repo:
233             abort('--dst-git-repo is required when copying a pipeline recursively.')
234         # Copy input collections, docker images and git repos.
235         pt = copy_collections(pt, src, dst)
236         copy_git_repos(pt, src, dst, dst_git_repo)
237
238     pt['name'] = pt['name'] + ' copy'
239     pt['ensure_unique_name'] = True
240     del pt['uuid']
241     del pt['owner_uuid']
242
243     return dst.pipeline_templates().create(body=pt).execute()
244
245 # copy_collections(obj, src, dst)
246 #
247 #    Recursively copies all collections referenced by 'obj' from src
248 #    to dst.
249 #
250 #    Returns a copy of obj with any old collection uuids replaced by
251 #    the new ones.
252 #
253 def copy_collections(obj, src, dst):
254     if type(obj) in [str, unicode]:
255         if uuid_type(src, obj) == 'Collection':
256             newc = copy_collection(obj, src, dst)
257             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
258                 return newc['uuid']
259         return obj
260     elif type(obj) == dict:
261         return {v: copy_collections(obj[v], src, dst) for v in obj}
262     elif type(obj) == list:
263         return [copy_collections(v, src, dst) for v in obj]
264     return obj
265
266 # copy_git_repos(p, src, dst, dst_repo)
267 #
268 #    Copies all git repositories referenced by pipeline instance or
269 #    template 'p' from src to dst.
270 #
271 #    For each component c in the pipeline:
272 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
273 #      * Rename script versions:
274 #          * c['script_version']
275 #          * c['job']['script_version']
276 #          * c['job']['supplied_script_version']
277 #        to the commit hashes they resolve to, since any symbolic
278 #        names (tags, branches) are not preserved in the destination repo.
279 #
280 #    The pipeline object is updated in place with the new repository
281 #    names.  The return value is undefined.
282 #
283 def copy_git_repos(p, src, dst, dst_repo):
284     copied = set()
285     for c in p['components']:
286         component = p['components'][c]
287         if 'repository' in component:
288             repo = component['repository']
289             if repo not in copied:
290                 copy_git_repo(repo, src, dst, dst_repo)
291                 copied.add(repo)
292             component['repository'] = dst_repo
293             if 'script_version' in component:
294                 repo_dir = local_repo_dir[repo]
295                 component['script_version'] = git_rev_parse(component['script_version'], repo_dir)
296         if 'job' in component:
297             j = component['job']
298             if 'repository' in j:
299                 repo = j['repository']
300                 if repo not in copied:
301                     copy_git_repo(repo, src, dst, dst_repo)
302                     copied.add(repo)
303                 j['repository'] = dst_repo
304                 repo_dir = local_repo_dir[repo]
305                 if 'script_version' in j:
306                     j['script_version'] = git_rev_parse(j['script_version'], repo_dir)
307                 if 'supplied_script_version' in j:
308                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
309
310 # copy_collection(obj_uuid, src, dst)
311 #
312 #    Copies the collection identified by obj_uuid from src to dst.
313 #    Returns the collection object created at dst.
314 #
315 #    For this application, it is critical to preserve the
316 #    collection's manifest hash, which is not guaranteed with the
317 #    arvados.CollectionReader and arvados.CollectionWriter classes.
318 #    Copying each block in the collection manually, followed by
319 #    the manifest block, ensures that the collection's manifest
320 #    hash will not change.
321 #
322 def copy_collection(obj_uuid, src, dst):
323     c = src.collections().get(uuid=obj_uuid).execute()
324
325     # Check whether a collection with this hash already exists
326     # at the destination.  If so, just return that collection.
327     if 'portable_data_hash' in c:
328         colhash = c['portable_data_hash']
329     else:
330         colhash = c['uuid']
331     dstcol = dst.collections().list(
332         filters=[['portable_data_hash', '=', colhash]]
333     ).execute()
334     if dstcol['items_available'] > 0:
335         logger.info("Skipping collection %s (already at dst)", obj_uuid)
336         return dstcol['items'][0]
337
338     logger.info("Copying collection %s", obj_uuid)
339
340     # Fetch the collection's manifest.
341     manifest = c['manifest_text']
342
343     # Enumerate the block locators found in the manifest.
344     collection_blocks = set()
345     src_keep = arvados.keep.KeepClient(src)
346     for line in manifest.splitlines():
347         try:
348             block_hash = line.split()[1]
349             collection_blocks.add(block_hash)
350         except ValueError:
351             abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
352
353     # Copy each block from src_keep to dst_keep.
354     dst_keep = arvados.keep.KeepClient(dst)
355     for locator in collection_blocks:
356         parts = locator.split('+')
357         logger.info("Copying block %s (%s bytes)", locator, parts[1])
358         data = src_keep.get(locator)
359         dst_keep.put(data)
360
361     # Copy the manifest and save the collection.
362     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
363     dst_keep.put(manifest)
364
365     if 'uuid' in c:
366         del c['uuid']
367     if 'owner_uuid' in c:
368         del c['owner_uuid']
369     c['ensure_unique_name'] = True
370     return dst.collections().create(body=c).execute()
371
372 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
373 #
374 #    Copies commits from git repository 'src_git_repo' on Arvados
375 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
376 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
377 #    or "jsmith")
378 #
379 #    All commits will be copied to a destination branch named for the
380 #    source repository URL.
381 #
382 #    Because users cannot create their own repositories, the
383 #    destination repository must already exist.
384 #
385 #    The user running this command must be authenticated
386 #    to both repositories.
387 #
388 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
389     # Identify the fetch and push URLs for the git repositories.
390     r = src.repositories().list(
391         filters=[['name', '=', src_git_repo]]).execute()
392     if r['items_available'] != 1:
393         raise Exception('cannot identify source repo {}; {} repos found'
394                         .format(src_git_repo, r['items_available']))
395     src_git_url = r['items'][0]['fetch_url']
396     logger.debug('src_git_url: {}'.format(src_git_url))
397
398     r = dst.repositories().list(
399         filters=[['name', '=', dst_git_repo]]).execute()
400     if r['items_available'] != 1:
401         raise Exception('cannot identify destination repo {}; {} repos found'
402                         .format(dst_git_repo, r['items_available']))
403     dst_git_push_url  = r['items'][0]['push_url']
404     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
405
406     dst_branch = re.sub(r'\W+', '_', src_git_url)
407
408     # Copy git commits from src repo to dst repo (but only if
409     # we have not already copied this repo in this session).
410     #
411     if src_git_repo in local_repo_dir:
412         logger.debug('already copied src repo %s, skipping', src_git_repo)
413     else:
414         tmprepo = tempfile.mkdtemp()
415         local_repo_dir[src_git_repo] = tmprepo
416         arvados.util.run_command(
417             ["git", "clone", src_git_url, tmprepo],
418             cwd=os.path.dirname(tmprepo))
419         arvados.util.run_command(
420             ["git", "checkout", "-b", dst_branch],
421             cwd=tmprepo)
422         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
423         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
424
425 # git_rev_parse(rev, repo)
426 #
427 #    Returns the 40-character commit hash corresponding to 'rev' in
428 #    git repository 'repo' (which must be the path of a local git
429 #    repository)
430 #
431 def git_rev_parse(rev, repo):
432     gitout, giterr = arvados.util.run_command(
433         ['git', 'rev-parse', rev], cwd=repo)
434     return gitout.strip()
435
436 # uuid_type(api, object_uuid)
437 #
438 #    Returns the name of the class that object_uuid belongs to, based on
439 #    the second field of the uuid.  This function consults the api's
440 #    schema to identify the object class.
441 #
442 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
443 #
444 #    Special case: if handed a Keep locator hash, return 'Collection'.
445 #
446 def uuid_type(api, object_uuid):
447     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
448         return 'Collection'
449     p = object_uuid.split('-')
450     if len(p) == 3:
451         type_prefix = p[1]
452         for k in api._schema.schemas:
453             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
454             if type_prefix == obj_class:
455                 return k
456     return None
457
458 def abort(msg, code=1):
459     print >>sys.stderr, "arv-copy:", msg
460     exit(code)
461
462 if __name__ == '__main__':
463     main()