3699: add 'force' argument to pipelines
[arvados.git] / sdk / python / arvados / commands / copy.py
1 #! /usr/bin/env python
2
3 # arv-copy [--recursive] [--no-recursive] object-uuid src dst
4 #
5 # Copies an object from Arvados instance src to instance dst.
6 #
7 # By default, arv-copy recursively copies any dependent objects
8 # necessary to make the object functional in the new instance
9 # (e.g. for a pipeline instance, arv-copy copies the pipeline
10 # template, input collection, docker images, git repositories). If
11 # --no-recursive is given, arv-copy copies only the single record
12 # identified by object-uuid.
13 #
14 # The user must have files $HOME/.config/arvados/{src}.conf and
15 # $HOME/.config/arvados/{dst}.conf with valid login credentials for
16 # instances src and dst.  If either of these files is not found,
17 # arv-copy will issue an error.
18
19 import argparse
20 import getpass
21 import os
22 import re
23 import shutil
24 import sys
25 import logging
26 import tempfile
27
28 import arvados
29 import arvados.config
30 import arvados.keep
31 import arvados.util
32
33 logger = logging.getLogger('arvados.arv-copy')
34
35 # local_repo_dir records which git repositories from the Arvados source
36 # instance have been checked out locally during this run, and to which
37 # directories.
38 # e.g. if repository 'twp' from src_arv has been cloned into
39 # /tmp/gitfHkV9lu44A then local_repo_dir['twp'] = '/tmp/gitfHkV9lu44A'
40 #
41 local_repo_dir = {}
42
43 def main():
44     parser = argparse.ArgumentParser(
45         description='Copy a pipeline instance from one Arvados instance to another.')
46
47     parser.add_argument(
48         '-v', '--verbose', dest='verbose', action='store_true',
49         help='Verbose output.')
50     parser.add_argument(
51         '-f', '--force', dest='force', action='store_true',
52         help='Perform copy even if the object appears to exist at the remote destination.')
53     parser.add_argument(
54         '--src', dest='source_arvados', required=True,
55         help='The name of the source Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
56     parser.add_argument(
57         '--dst', dest='destination_arvados', required=True,
58         help='The name of the destination Arvados instance (required). May be either a pathname to a config file, or the basename of a file in $HOME/.config/arvados/instance_name.conf.')
59     parser.add_argument(
60         '--recursive', dest='recursive', action='store_true',
61         help='Recursively copy any dependencies for this object. (default)')
62     parser.add_argument(
63         '--no-recursive', dest='recursive', action='store_false',
64         help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.')
65     parser.add_argument(
66         '--dst-git-repo', dest='dst_git_repo',
67         help='The name of the destination git repository. Required when copying a pipeline recursively.')
68     parser.add_argument(
69         '--project-uuid', dest='project_uuid',
70         help='The UUID of the project at the destination to which the pipeline should be copied.')
71     parser.add_argument(
72         'object_uuid',
73         help='The UUID of the object to be copied.')
74     parser.set_defaults(recursive=True)
75
76     args = parser.parse_args()
77
78     if args.verbose:
79         logger.setLevel(logging.DEBUG)
80     else:
81         logger.setLevel(logging.INFO)
82
83     # Create API clients for the source and destination instances
84     src_arv = api_for_instance(args.source_arvados)
85     dst_arv = api_for_instance(args.destination_arvados)
86
87     # Identify the kind of object we have been given, and begin copying.
88     t = uuid_type(src_arv, args.object_uuid)
89     if t == 'Collection':
90         result = copy_collection(args.object_uuid,
91                                  src_arv, dst_arv,
92                                  force=args.force)
93     elif t == 'PipelineInstance':
94         result = copy_pipeline_instance(args.object_uuid,
95                                         src_arv, dst_arv,
96                                         args.dst_git_repo,
97                                         dst_project=args.project_uuid,
98                                         recursive=args.recursive,
99                                         force=args.force)
100     elif t == 'PipelineTemplate':
101         result = copy_pipeline_template(args.object_uuid,
102                                         src_arv, dst_arv,
103                                         args.dst_git_repo,
104                                         recursive=args.recursive,
105                                         force=args.force)
106     else:
107         abort("cannot copy object {} of type {}".format(args.object_uuid, t))
108
109     # Clean up any outstanding temp git repositories.
110     for d in local_repo_dir.values():
111         shutil.rmtree(d, ignore_errors=True)
112
113     # If no exception was thrown and the response does not have an
114     # error_token field, presume success
115     if 'error_token' in result or 'uuid' not in result:
116         logger.error("API server returned an error result: {}".format(result))
117         exit(1)
118
119     logger.info("")
120     logger.info("Success: created copy with uuid {}".format(result['uuid']))
121     exit(0)
122
123 # api_for_instance(instance_name)
124 #
125 #     Creates an API client for the Arvados instance identified by
126 #     instance_name.
127 #
128 #     If instance_name contains a slash, it is presumed to be a path
129 #     (either local or absolute) to a file with Arvados configuration
130 #     settings.
131 #
132 #     Otherwise, it is presumed to be the name of a file in
133 #     $HOME/.config/arvados/instance_name.conf
134 #
135 def api_for_instance(instance_name):
136     if '/' in instance_name:
137         config_file = instance_name
138     else:
139         config_file = os.path.join(os.environ['HOME'], '.config', 'arvados', "{}.conf".format(instance_name))
140
141     try:
142         cfg = arvados.config.load(config_file)
143     except (IOError, OSError) as e:
144         abort(("Could not open config file {}: {}\n" +
145                "You must make sure that your configuration tokens\n" +
146                "for Arvados instance {} are in {} and that this\n" +
147                "file is readable.").format(
148                    config_file, e, instance_name, config_file))
149
150     if 'ARVADOS_API_HOST' in cfg and 'ARVADOS_API_TOKEN' in cfg:
151         api_is_insecure = (
152             cfg.get('ARVADOS_API_HOST_INSECURE', '').lower() in set(
153                 ['1', 't', 'true', 'y', 'yes']))
154         client = arvados.api('v1',
155                              host=cfg['ARVADOS_API_HOST'],
156                              token=cfg['ARVADOS_API_TOKEN'],
157                              insecure=api_is_insecure,
158                              cache=False)
159     else:
160         abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name))
161     return client
162
163 # copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst)
164 #
165 #    Copies a pipeline instance identified by pi_uuid from src to dst.
166 #
167 #    If the 'recursive' option evaluates to True:
168 #      1. Copies all input collections
169 #           * For each component in the pipeline, include all collections
170 #             listed as job dependencies for that component)
171 #      2. Copy docker images
172 #      3. Copy git repositories
173 #      4. Copy the pipeline template
174 #
175 #    The 'force' option is passed through to copy_collections.
176 #
177 #    The only changes made to the copied pipeline instance are:
178 #      1. The original pipeline instance UUID is preserved in
179 #         the 'properties' hash as 'copied_from_pipeline_instance_uuid'.
180 #      2. The pipeline_template_uuid is changed to the new template uuid.
181 #      3. The owner_uuid of the instance is changed to the user who
182 #         copied it.
183 #
184 def copy_pipeline_instance(pi_uuid, src, dst, dst_git_repo, dst_project=None, recursive=True, force=False):
185     # Fetch the pipeline instance record.
186     pi = src.pipeline_instances().get(uuid=pi_uuid).execute()
187
188     if recursive:
189         if not dst_git_repo:
190             abort('--dst-git-repo is required when copying a pipeline recursively.')
191         # Copy the pipeline template and save the copied template.
192         if pi.get('pipeline_template_uuid', None):
193             pt = copy_pipeline_template(pi['pipeline_template_uuid'],
194                                         src, dst,
195                                         dst_git_repo,
196                                         recursive=True)
197
198         # Copy input collections, docker images and git repos.
199         pi = copy_collections(pi, src, dst, force)
200         copy_git_repos(pi, src, dst, dst_git_repo)
201
202         # Update the fields of the pipeline instance with the copied
203         # pipeline template.
204         if pi.get('pipeline_template_uuid', None):
205             pi['pipeline_template_uuid'] = pt['uuid']
206
207     else:
208         # not recursive
209         print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid)
210         print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated."
211
212     # Update the pipeline instance properties, and create the new
213     # instance at dst.
214     pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid
215     pi['description'] = "Pipeline copied from {}\n\n{}".format(
216         pi_uuid, pi.get('description', ''))
217     if dst_project:
218         pi['owner_uuid'] = dst_project
219     else:
220         del pi['owner_uuid']
221     del pi['uuid']
222     pi['ensure_unique_name'] = True
223
224     new_pi = dst.pipeline_instances().create(body=pi).execute()
225     return new_pi
226
227 # copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive)
228 #
229 #    Copies a pipeline template identified by pt_uuid from src to dst.
230 #
231 #    If the 'recursive' option evaluates to true, also copy any collections,
232 #    docker images and git repositories that this template references.
233 #
234 #    The owner_uuid of the new template is changed to that of the user
235 #    who copied the template.
236 #
237 #    Returns the copied pipeline template object.
238 #
239 def copy_pipeline_template(pt_uuid, src, dst, dst_git_repo, recursive=True, force=False):
240     # fetch the pipeline template from the source instance
241     pt = src.pipeline_templates().get(uuid=pt_uuid).execute()
242
243     if recursive:
244         if not dst_git_repo:
245             abort('--dst-git-repo is required when copying a pipeline recursively.')
246         # Copy input collections, docker images and git repos.
247         pt = copy_collections(pt, src, dst, force)
248         copy_git_repos(pt, src, dst, dst_git_repo)
249
250     pt['description'] = "Pipeline template copied from {}\n\n{}".format(
251         pt_uuid, pt.get('description', ''))
252     pt['name'] = "{} copied from {}".format(pt.get('name', ''), pt_uuid)
253     pt['ensure_unique_name'] = True
254     del pt['uuid']
255     del pt['owner_uuid']
256
257     return dst.pipeline_templates().create(body=pt).execute()
258
259 # copy_collections(obj, src, dst)
260 #
261 #    Recursively copies all collections referenced by 'obj' from src
262 #    to dst.
263 #
264 #    Returns a copy of obj with any old collection uuids replaced by
265 #    the new ones.
266 #
267 def copy_collections(obj, src, dst, force=False):
268     if type(obj) in [str, unicode]:
269         if uuid_type(src, obj) == 'Collection':
270             newc = copy_collection(obj, src, dst, force)
271             if obj != newc['uuid'] and obj != newc['portable_data_hash']:
272                 return newc['uuid']
273         return obj
274     elif type(obj) == dict:
275         return {v: copy_collections(obj[v], src, dst, force) for v in obj}
276     elif type(obj) == list:
277         return [copy_collections(v, src, dst, force) for v in obj]
278     return obj
279
280 # copy_git_repos(p, src, dst, dst_repo)
281 #
282 #    Copies all git repositories referenced by pipeline instance or
283 #    template 'p' from src to dst.
284 #
285 #    For each component c in the pipeline:
286 #      * Copy git repositories named in c['repository'] and c['job']['repository'] if present
287 #      * Rename script versions:
288 #          * c['script_version']
289 #          * c['job']['script_version']
290 #          * c['job']['supplied_script_version']
291 #        to the commit hashes they resolve to, since any symbolic
292 #        names (tags, branches) are not preserved in the destination repo.
293 #
294 #    The pipeline object is updated in place with the new repository
295 #    names.  The return value is undefined.
296 #
297 def copy_git_repos(p, src, dst, dst_repo):
298     copied = set()
299     for c in p['components']:
300         component = p['components'][c]
301         if 'repository' in component:
302             repo = component['repository']
303             if repo not in copied:
304                 copy_git_repo(repo, src, dst, dst_repo)
305                 copied.add(repo)
306             component['repository'] = dst_repo
307             if 'script_version' in component:
308                 repo_dir = local_repo_dir[repo]
309                 component['script_version'] = git_rev_parse(component['script_version'], repo_dir)
310         if 'job' in component:
311             j = component['job']
312             if 'repository' in j:
313                 repo = j['repository']
314                 if repo not in copied:
315                     copy_git_repo(repo, src, dst, dst_repo)
316                     copied.add(repo)
317                 j['repository'] = dst_repo
318                 repo_dir = local_repo_dir[repo]
319                 if 'script_version' in j:
320                     j['script_version'] = git_rev_parse(j['script_version'], repo_dir)
321                 if 'supplied_script_version' in j:
322                     j['supplied_script_version'] = git_rev_parse(j['supplied_script_version'], repo_dir)
323
324 # copy_collection(obj_uuid, src, dst, force)
325 #
326 #    Copies the collection identified by obj_uuid from src to dst.
327 #    Returns the collection object created at dst.
328 #
329 #    If a collection with the desired portable_data_hash already
330 #    exists at dst, and the 'force' argument is False, copy_collection
331 #    returns the existing collection without copying any blocks.
332 #    Otherwise (if no collection exists or if 'force' is True)
333 #    copy_collection copies all of the collection data blocks from src
334 #    to dst.
335 #
336 #    For this application, it is critical to preserve the
337 #    collection's manifest hash, which is not guaranteed with the
338 #    arvados.CollectionReader and arvados.CollectionWriter classes.
339 #    Copying each block in the collection manually, followed by
340 #    the manifest block, ensures that the collection's manifest
341 #    hash will not change.
342 #
343 def copy_collection(obj_uuid, src, dst, force=False):
344     c = src.collections().get(uuid=obj_uuid).execute()
345
346     # If a collection with this hash already exists at the
347     # destination, and 'force' is not true, just return that
348     # collection.
349     if not force:
350         if 'portable_data_hash' in c:
351             colhash = c['portable_data_hash']
352         else:
353             colhash = c['uuid']
354         dstcol = dst.collections().list(
355             filters=[['portable_data_hash', '=', colhash]]
356         ).execute()
357         if dstcol['items_available'] > 0:
358             logger.info("Skipping collection %s (already at dst)", obj_uuid)
359             return dstcol['items'][0]
360
361     logger.info("Copying collection %s", obj_uuid)
362
363     # Fetch the collection's manifest.
364     manifest = c['manifest_text']
365
366     # Enumerate the block locators found in the manifest.
367     collection_blocks = set()
368     src_keep = arvados.keep.KeepClient(api_client=src, num_retries=2)
369     for line in manifest.splitlines():
370         for block_hash in line.split()[1:]:
371             if arvados.util.portable_data_hash_pattern.match(block_hash):
372                 collection_blocks.add(block_hash)
373             else:
374                 break
375
376     # Copy each block from src_keep to dst_keep.
377     dst_keep = arvados.keep.KeepClient(api_client=dst, num_retries=2)
378     for locator in collection_blocks:
379         parts = locator.split('+')
380         logger.info("Copying block %s (%s bytes)", locator, parts[1])
381         data = src_keep.get(locator)
382         dst_keep.put(data)
383
384     # Copy the manifest and save the collection.
385     logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest))
386     dst_keep.put(manifest)
387
388     if 'uuid' in c:
389         del c['uuid']
390     if 'owner_uuid' in c:
391         del c['owner_uuid']
392     c['ensure_unique_name'] = True
393     return dst.collections().create(body=c).execute()
394
395 # copy_git_repo(src_git_repo, src, dst, dst_git_repo)
396 #
397 #    Copies commits from git repository 'src_git_repo' on Arvados
398 #    instance 'src' to 'dst_git_repo' on 'dst'.  Both src_git_repo
399 #    and dst_git_repo are repository names, not UUIDs (i.e. "arvados"
400 #    or "jsmith")
401 #
402 #    All commits will be copied to a destination branch named for the
403 #    source repository URL.
404 #
405 #    Because users cannot create their own repositories, the
406 #    destination repository must already exist.
407 #
408 #    The user running this command must be authenticated
409 #    to both repositories.
410 #
411 def copy_git_repo(src_git_repo, src, dst, dst_git_repo):
412     # Identify the fetch and push URLs for the git repositories.
413     r = src.repositories().list(
414         filters=[['name', '=', src_git_repo]]).execute()
415     if r['items_available'] != 1:
416         raise Exception('cannot identify source repo {}; {} repos found'
417                         .format(src_git_repo, r['items_available']))
418     src_git_url = r['items'][0]['fetch_url']
419     logger.debug('src_git_url: {}'.format(src_git_url))
420
421     r = dst.repositories().list(
422         filters=[['name', '=', dst_git_repo]]).execute()
423     if r['items_available'] != 1:
424         raise Exception('cannot identify destination repo {}; {} repos found'
425                         .format(dst_git_repo, r['items_available']))
426     dst_git_push_url  = r['items'][0]['push_url']
427     logger.debug('dst_git_push_url: {}'.format(dst_git_push_url))
428
429     dst_branch = re.sub(r'\W+', '_', src_git_url)
430
431     # Copy git commits from src repo to dst repo (but only if
432     # we have not already copied this repo in this session).
433     #
434     if src_git_repo in local_repo_dir:
435         logger.debug('already copied src repo %s, skipping', src_git_repo)
436     else:
437         tmprepo = tempfile.mkdtemp()
438         local_repo_dir[src_git_repo] = tmprepo
439         arvados.util.run_command(
440             ["git", "clone", src_git_url, tmprepo],
441             cwd=os.path.dirname(tmprepo))
442         arvados.util.run_command(
443             ["git", "checkout", "-b", dst_branch],
444             cwd=tmprepo)
445         arvados.util.run_command(["git", "remote", "add", "dst", dst_git_push_url], cwd=tmprepo)
446         arvados.util.run_command(["git", "push", "dst", dst_branch], cwd=tmprepo)
447
448 # git_rev_parse(rev, repo)
449 #
450 #    Returns the 40-character commit hash corresponding to 'rev' in
451 #    git repository 'repo' (which must be the path of a local git
452 #    repository)
453 #
454 def git_rev_parse(rev, repo):
455     gitout, giterr = arvados.util.run_command(
456         ['git', 'rev-parse', rev], cwd=repo)
457     return gitout.strip()
458
459 # uuid_type(api, object_uuid)
460 #
461 #    Returns the name of the class that object_uuid belongs to, based on
462 #    the second field of the uuid.  This function consults the api's
463 #    schema to identify the object class.
464 #
465 #    It returns a string such as 'Collection', 'PipelineInstance', etc.
466 #
467 #    Special case: if handed a Keep locator hash, return 'Collection'.
468 #
469 def uuid_type(api, object_uuid):
470     if re.match(r'^[a-f0-9]{32}\+[0-9]+(\+[A-Za-z0-9+-]+)?$', object_uuid):
471         return 'Collection'
472     p = object_uuid.split('-')
473     if len(p) == 3:
474         type_prefix = p[1]
475         for k in api._schema.schemas:
476             obj_class = api._schema.schemas[k].get('uuidPrefix', None)
477             if type_prefix == obj_class:
478                 return k
479     return None
480
481 def abort(msg, code=1):
482     print >>sys.stderr, "arv-copy:", msg
483     exit(code)
484
485 if __name__ == '__main__':
486     main()