3699: added --force option
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31
32 logger = logging.getLogger('arvados.arv-copy')
33
34 # local_repo_dir records which git repositories from the Arvados source
35 # instance have been checked out locally during this run, and to which
36 # directories.
37 # e.g. if repository 'twp' from src_arv has been cloned into
38 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
39 #
40 local_repo_dir = {}
41
42 def main():
43     parser = argparse.ArgumentParser(
44         description='Copy a pipeline instance from one Arvados instance to another.')
45
46     parser.add_argument(
47         '-v', '--verbose', dest='verbose', action='store_true',
48         help='Verbose output.')
49     parser.add_argument(
50         '-f', '--force', dest='force', action='store_true',
51         help='Perform copy even if the object appears to exist at the remote destination.')
52     parser.add_argument(
53         '--src', dest='source_arvados', required=True,
54         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
55     parser.add_argument(
56         '--dst', dest='destination_arvados', required=True,
57         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
58     parser.add_argument(
59         '--recursive', dest='recursive', action='store_true',
60         help='Recursively copy any dependencies for this object. (default)')
61     parser.add_argument(
62         '--no-recursive', dest='recursive', action='store_false',
63         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
64     parser.add_argument(
65         '--dst-git-repo', dest='dst_git_repo',
66         help='The name of the destination git repository. Required when copying a pipeline recursively.')
67     parser.add_argument(
68         '--project-uuid', dest='project_uuid',
69         help='The UUID of the project at the destination to which the pipeline should be copied.')
70     parser.add_argument(
71         'object_uuid',
72         help='The UUID of the object to be copied.')
73     parser.set_defaults(recursive=True)
74
75     args = parser.parse_args()
76
77     if args.verbose:
78         logger.setLevel(logging.DEBUG)
79     else:
80         logger.setLevel(logging.INFO)
81
82     # Create API clients for the source and destination instances
83     src_arv = api_for_instance(args.source_arvados)
84     dst_arv = api_for_instance(args.destination_arvados)
85
86     # Identify the kind of object we have been given, and begin copying.
87     t = uuid_type(src_arv, args.object_uuid)
88     if t == 'Collection':
89         result = copy_collection(args.object_uuid,
90                                  src_arv, dst_arv,
91                                  force=args.force)
92     elif t == 'PipelineInstance':
93         result = copy_pipeline_instance(args.object_uuid,
94                                         src_arv, dst_arv,
95                                         args.dst_git_repo,
96                                         dst_project=args.project_uuid,
97                                         recursive=args.recursive)
98     elif t == 'PipelineTemplate':
99         result = copy_pipeline_template(args.object_uuid,
100                                         src_arv, dst_arv,
101                                         args.dst_git_repo,
102                                         recursive=args.recursive)
103     else:
104         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
105
106     # Clean up any outstanding temp git repositories.
107     for d in local_repo_dir.values():
108         shutil.rmtree(d, ignore_errors=True)
109
110     # If no exception was thrown and the response does not have an
111     # error_token field, presume success
112     if 'error_token' in result or 'uuid' not in result:
113         logger.error("API server returned an error result: {}".format(result))
114         exit(1)
115
116     logger.info("")
117     logger.info("Success: created copy with uuid {}".format(result['uuid']))
118     exit(0)
119
120 # api_for_instance(instance_name)
121 #
122 #     Creates an API client for the Arvados instance identified by
123 #     instance_name.
124 #
125 #     If instance_name contains a slash, it is presumed to be a path
126 #     (either local or absolute) to a file with Arvados configuration
127 #     settings.
128 #
129 #     Otherwise, it is presumed to be the name of a file in
130 #     $HOME/.config/arvados/instance_name.conf
131 #
132 def api_for_instance(instance_name):
133     if '/' in instance_name:
134         config_file = instance_name
135     else:
136         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
137
138     try:
139         cfg = arvados.config.load(config_file)
140     except (IOError, OSError) as e:
141         abort(("Could not open config file {}: {}\n" +
142                "You must make sure that your configuration tokens\n" +
143                "for Arvados instance {} are in {} and that this\n" +
144                "file is readable.").format(
145                    config_file, e, instance_name, config_file))
146
147     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
148         api_is_insecure = (
149             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
150                 ['1', 't', 'true', 'y', 'yes']))
151         client = arvados.api('v1',
152                              host=cfg['ARVADOS_API_HOST'],
153                              token=cfg['ARVADOS_API_TOKEN'],
154                              insecure=api_is_insecure,
155                              cache=False)
156     else:
157         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
158     return client
159
160 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
161 #
162 #    Copies a pipeline instance identified by pi_uuid from src to dst.
163 #
164 #    If the 'recursive' option evaluates to True:
165 #      1. Copies all input collections
166 #           * For each component in the pipeline, include all collections
167 #             listed as job dependencies for that component)
168 #      2. Copy docker images
169 #      3. Copy git repositories
170 #      4. Copy the pipeline template
171 #
172 #    The only changes made to the copied pipeline instance are:
173 #      1. The original pipeline instance UUID is preserved in
174 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
175 #      2. The pipeline_template_uuid is changed to the new template uuid.
176 #      3. The owner_uuid of the instance is changed to the user who
177 #         copied it.
178 #
179 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True):
180     # Fetch the pipeline instance record.
181     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
182
183     if recursive:
184         if not dst_git_repo:
185             abort('--dst-git-repo is required when copying a pipeline recursively.')
186         # Copy the pipeline template and save the copied template.
187         if pi.get('pipeline_template_uuid', None):
188             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
189                                         src, dst,
190                                         dst_git_repo,
191                                         recursive=True)
192
193         # Copy input collections, docker images and git repos.
194         pi = copy_collections(pi, src, dst)
195         copy_git_repos(pi, src, dst, dst_git_repo)
196
197         # Update the fields of the pipeline instance with the copied
198         # pipeline template.
199         if pi.get('pipeline_template_uuid', None):
200             pi['pipeline_template_uuid'] = pt['uuid']
201
202     else:
203         # not recursive
204         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
205         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
206
207     # Update the pipeline instance properties, and create the new
208     # instance at dst.
209     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
210     if dst_project:
211         pi['owner_uuid'] = dst_project
212     else:
213         del pi['owner_uuid']
214     del pi['uuid']
215     pi['ensure_unique_name'] = True
216
217     new_pi = dst.pipeline_instances().create(body=pi).execute()
218     return new_pi
219
220 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
221 #
222 #    Copies a pipeline template identified by pt_uuid from src to dst.
223 #
224 #    If the 'recursive' option evaluates to true, also copy any collections,
225 #    docker images and git repositories that this template references.
226 #
227 #    The owner_uuid of the new template is changed to that of the user
228 #    who copied the template.
229 #
230 #    Returns the copied pipeline template object.
231 #
232 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True):
233     # fetch the pipeline template from the source instance
234     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
235
236     if recursive:
237         if not dst_git_repo:
238             abort('--dst-git-repo is required when copying a pipeline recursively.')
239         # Copy input collections, docker images and git repos.
240         pt = copy_collections(pt, src, dst)
241         copy_git_repos(pt, src, dst, dst_git_repo)
242
243     pt['name'] = pt['name'] + ' copy'
244     pt['ensure_unique_name'] = True
245     del pt['uuid']
246     del pt['owner_uuid']
247
248     return dst.pipeline_templates().create(body=pt).execute()
249
250 # copy_collections(obj, src, dst)
251 #
252 #    Recursively copies all collections referenced by 'obj' from src
253 #    to dst.
254 #
255 #    Returns a copy of obj with any old collection uuids replaced by
256 #    the new ones.
257 #
258 def copy_collections(obj, src, dst):
259     if type(obj) in [str, unicode]:
260         if uuid_type(src, obj) == 'Collection':
261             newc = copy_collection(obj, src, dst)
262             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
263                 return newc['uuid']
264         return obj
265     elif type(obj) == dict:
266         return {v: copy_collections(obj[v], src, dst) for v in obj}
267     elif type(obj) == list:
268         return [copy_collections(v, src, dst) for v in obj]
269     return obj
270
271 # copy_git_repos(p, src, dst, dst_repo)
272 #
273 #    Copies all git repositories referenced by pipeline instance or
274 #    template 'p' from src to dst.
275 #
276 #    For each component c in the pipeline:
277 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
278 #      * Rename script versions:
279 #          * c['script_version']
280 #          * c['job']['script_version']
281 #          * c['job']['supplied_script_version']
282 #        to the commit hashes they resolve to, since any symbolic
283 #        names (tags, branches) are not preserved in the destination repo.
284 #
285 #    The pipeline object is updated in place with the new repository
286 #    names.  The return value is undefined.
287 #
288 def copy_git_repos(p, src, dst, dst_repo):
289     copied = set()
290     for c in p['components']:
291         component = p['components'][c]
292         if 'repository' in component:
293             repo = component['repository']
294             if repo not in copied:
295                 copy_git_repo(repo, src, dst, dst_repo)
296                 copied.add(repo)
297             component['repository'] = dst_repo
298             if 'script_version' in component:
299                 repo_dir = local_repo_dir[repo]
300                 component['script_version'] = git_rev_parse(component['script_version'], repo_dir)
301         if 'job' in component:
302             j = component['job']
303             if 'repository' in j:
304                 repo = j['repository']
305                 if repo not in copied:
306                     copy_git_repo(repo, src, dst, dst_repo)
307                     copied.add(repo)
308                 j['repository'] = dst_repo
309                 repo_dir = local_repo_dir[repo]
310                 if 'script_version' in j:
311                     j['script_version'] = git_rev_parse(j['script_version'], repo_dir)
312                 if 'supplied_script_version' in j:
313                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
314
315 # copy_collection(obj_uuid, src, dst)
316 #
317 #    Copies the collection identified by obj_uuid from src to dst.
318 #    Returns the collection object created at dst.
319 #
320 #    For this application, it is critical to preserve the
321 #    collection's manifest hash, which is not guaranteed with the
322 #    arvados.CollectionReader and arvados.CollectionWriter classes.
323 #    Copying each block in the collection manually, followed by
324 #    the manifest block, ensures that the collection's manifest
325 #    hash will not change.
326 #
327 def copy_collection(obj_uuid, src, dst, force=False):
328     c = src.collections().get(uuid=obj_uuid).execute()
329
330     # If a collection with this hash already exists at the
331     # destination, and 'force' is not true, just return that
332     # collection.
333     if not force:
334         if 'portable_data_hash' in c:
335             colhash = c['portable_data_hash']
336         else:
337             colhash = c['uuid']
338         dstcol = dst.collections().list(
339             filters=[['portable_data_hash', '=', colhash]]
340         ).execute()
341         if dstcol['items_available'] > 0:
342             logger.info("Skipping collection %s (already at dst)", obj_uuid)
343             return dstcol['items'][0]
344
345     logger.info("Copying collection %s", obj_uuid)
346
347     # Fetch the collection's manifest.
348     manifest = c['manifest_text']
349
350     # Enumerate the block locators found in the manifest.
351     collection_blocks = set()
352     src_keep = arvados.keep.KeepClient(src)
353     for line in manifest.splitlines():
354         try:
355             block_hash = line.split()[1]
356             collection_blocks.add(block_hash)
357         except ValueError:
358             abort('bad manifest line in collection {}: {}'.format(obj_uuid, f))
359
360     # Copy each block from src_keep to dst_keep.
361     dst_keep = arvados.keep.KeepClient(dst)
362     for locator in collection_blocks:
363         parts = locator.split('+')
364         logger.info("Copying block %s (%s bytes)", locator, parts[1])
365         data = src_keep.get(locator)
366         dst_keep.put(data)
367
368     # Copy the manifest and save the collection.
369     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
370     dst_keep.put(manifest)
371
372     if 'uuid' in c:
373         del c['uuid']
374     if 'owner_uuid' in c:
375         del c['owner_uuid']
376     c['ensure_unique_name'] = True
377     return dst.collections().create(body=c).execute()
378
379 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
380 #
381 #    Copies commits from git repository 'src_git_repo' on Arvados
382 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
383 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
384 #    or "jsmith")
385 #
386 #    All commits will be copied to a destination branch named for the
387 #    source repository URL.
388 #
389 #    Because users cannot create their own repositories, the
390 #    destination repository must already exist.
391 #
392 #    The user running this command must be authenticated
393 #    to both repositories.
394 #
395 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
396     # Identify the fetch and push URLs for the git repositories.
397     r = src.repositories().list(
398         filters=[['name', '=', src_git_repo]]).execute()
399     if r['items_available'] != 1:
400         raise Exception('cannot identify source repo {}; {} repos found'
401                         .format(src_git_repo, r['items_available']))
402     src_git_url = r['items'][0]['fetch_url']
403     logger.debug('src_git_url: {}'.format(src_git_url))
404
405     r = dst.repositories().list(
406         filters=[['name', '=', dst_git_repo]]).execute()
407     if r['items_available'] != 1:
408         raise Exception('cannot identify destination repo {}; {} repos found'
409                         .format(dst_git_repo, r['items_available']))
410     dst_git_push_url  = r['items'][0]['push_url']
411     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
412
413     dst_branch = re.sub(r'\W+', '_', src_git_url)
414
415     # Copy git commits from src repo to dst repo (but only if
416     # we have not already copied this repo in this session).
417     #
418     if src_git_repo in local_repo_dir:
419         logger.debug('already copied src repo %s, skipping', src_git_repo)
420     else:
421         tmprepo = tempfile.mkdtemp()
422         local_repo_dir[src_git_repo] = tmprepo
423         arvados.util.run_command(
424             ["git", "clone", src_git_url, tmprepo],
425             cwd=os.path.dirname(tmprepo))
426         arvados.util.run_command(
427             ["git", "checkout", "-b", dst_branch],
428             cwd=tmprepo)
429         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
430         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
431
432 # git_rev_parse(rev, repo)
433 #
434 #    Returns the 40-character commit hash corresponding to 'rev' in
435 #    git repository 'repo' (which must be the path of a local git
436 #    repository)
437 #
438 def git_rev_parse(rev, repo):
439     gitout, giterr = arvados.util.run_command(
440         ['git', 'rev-parse', rev], cwd=repo)
441     return gitout.strip()
442
443 # uuid_type(api, object_uuid)
444 #
445 #    Returns the name of the class that object_uuid belongs to, based on
446 #    the second field of the uuid.  This function consults the api's
447 #    schema to identify the object class.
448 #
449 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
450 #
451 #    Special case: if handed a Keep locator hash, return 'Collection'.
452 #
453 def uuid_type(api, object_uuid):
454     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
455         return 'Collection'
456     p = object_uuid.split('-')
457     if len(p) == 3:
458         type_prefix = p[1]
459         for k in api._schema.schemas:
460             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
461             if type_prefix == obj_class:
462                 return k
463     return None
464
465 def abort(msg, code=1):
466     print >>sys.stderr, "arv-copy:", msg
467     exit(code)
468
469 if __name__ == '__main__':
470     main()