X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4b34d2584324664897467bb902599938710c9650..34aac296f4a0d2df0e369a9169924ef7849d6e85:/sdk/python/arvados/commands/copy.py diff --git a/sdk/python/arvados/commands/copy.py b/sdk/python/arvados/commands/copy.py index 526304c33a..6c0c2a7699 100755 --- a/sdk/python/arvados/commands/copy.py +++ b/sdk/python/arvados/commands/copy.py @@ -38,9 +38,10 @@ def main(): parser.add_argument( '--recursive', dest='recursive', action='store_true', - help='Recursively add any objects that this object depends upon.') + help='Recursively copy any dependencies for this object. (default)') parser.add_argument( - '--no-recursive', dest='recursive', action='store_false') + '--no-recursive', dest='recursive', action='store_false', + help='Do not copy any dependencies. NOTE: if this option is given, the copied object will need to be updated manually in order to be functional.') parser.add_argument( '--dst-git-repo', dest='dst_git_repo', help='The name of the destination git repository.') @@ -75,7 +76,9 @@ def main(): recursive=args.recursive, src=src_arv, dst=dst_arv) elif t == 'PipelineTemplate': - result = copy_pipeline_template(args.object_uuid, src=src_arv, dst=dst_arv) + result = copy_pipeline_template(args.object_uuid, + recursive=args.recursive, + src=src_arv, dst=dst_arv) else: abort("cannot copy object {} of type {}".format(args.object_uuid, t)) @@ -107,50 +110,9 @@ def api_for_instance(instance_name): abort('need ARVADOS_API_HOST and ARVADOS_API_TOKEN for {}'.format(instance_name)) return client -# copy_collection(obj_uuid, src, dst) -# -# Copy the collection identified by obj_uuid from src to dst. -# -# For this application, it is critical to preserve the -# collection's manifest hash, which is not guaranteed with the -# arvados.CollectionReader and arvados.CollectionWriter classes. -# Copying each block in the collection manually, followed by -# the manifest block, ensures that the collection's manifest -# hash will not change. -# -def copy_collection(obj_uuid, src=None, dst=None): - # Fetch the collection's manifest. - c = src.collections().get(uuid=obj_uuid).execute() - manifest = c['manifest_text'] - logging.debug('copying collection %s', obj_uuid) - logging.debug('manifest_text = %s', manifest) - - # Enumerate the block locators found in the manifest. - collection_blocks = sets.Set() - src_keep = arvados.keep.KeepClient(src) - for line in manifest.splitlines(): - try: - block_hash = line.split()[1] - collection_blocks.add(block_hash) - except ValueError: - abort('bad manifest line in collection {}: {}'.format(obj_uuid, f)) - - # Copy each block from src_keep to dst_keep. - dst_keep = arvados.keep.KeepClient(dst) - for locator in collection_blocks: - data = src_keep.get(locator) - logger.debug('copying block %s', locator) - logger.info("Retrieved %d bytes", len(data)) - dst_keep.put(data) - - # Copy the manifest and save the collection. - logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest)) - dst_keep.put(manifest) - return dst.collections().create(body={"manifest_text": manifest}).execute() - -# copy_pipeline_instance(obj_uuid, dst_git_repo, dst_project, recursive, src, dst) +# copy_pipeline_instance(pi_uuid, dst_git_repo, dst_project, recursive, src, dst) # -# Copies a pipeline instance identified by obj_uuid from src to dst. +# Copies a pipeline instance identified by pi_uuid from src to dst. # # If the 'recursive' option evaluates to True: # 1. Copies all input collections @@ -167,84 +129,171 @@ def copy_collection(obj_uuid, src=None, dst=None): # 3. The owner_uuid of the instance is changed to the user who # copied it. # -def copy_pipeline_instance(obj_uuid, dst_git_repo=None, dst_project=None, recursive=True, src=None, dst=None): +def copy_pipeline_instance(pi_uuid, dst_git_repo=None, dst_project=None, recursive=True, src=None, dst=None): # Fetch the pipeline instance record. - pi = src.pipeline_instances().get(uuid=obj_uuid).execute() + pi = src.pipeline_instances().get(uuid=pi_uuid).execute() + pi['properties']['copied_from_pipeline_instance_uuid'] = pi_uuid if recursive: - # Copy input collections and docker images: - # For each component c in the pipeline, add any - # collection hashes found in c['job']['dependencies'] - # and c['job']['docker_image_locator']. - # - input_collections = sets.Set() - for cname in pi['components']: - if 'job' not in pi['components'][cname]: - continue - job = pi['components'][cname]['job'] - for dep in job['dependencies']: - input_collections.add(dep) - docker = job.get('docker_image_locator', None) - if docker: - input_collections.add(docker) - - for c in input_collections: - copy_collection(c, src, dst) - - # Copy the git repositorie(s) - repos = sets.Set() - for c in pi['components']: - component = pi['components'][c] - if 'repository' in component: - repos.add(component['repository']) - if 'job' in component and 'repository' in component['job']: - repos.add(component['job']['repository']) - - for r in repos: - dst_branch = '{}_{}'.format(obj_uuid, r) - copy_git_repo(r, dst_git_repo, dst_branch, src, dst) - - # Copy the pipeline template and save the uuid of the copy - new_pt = copy_pipeline_template(pi['pipeline_template_uuid'], src, dst) - - # Update the fields of the pipeline instance - pi['properties']['copied_from_pipeline_instance_uuid'] = obj_uuid - pi['pipeline_template_uuid'] = new_pt + # Copy the pipeline template and save the copied template. + pt = copy_pipeline_template(pi['pipeline_template_uuid'], + recursive=True, + src, dst) + + # Copy input collections, docker images and git repos. + pi = copy_collections(pi, src, dst) + copy_git_repos(pi, dst_git_repo, src, dst) + + # Update the fields of the pipeline instance with the copied + # pipeline template. + pi['pipeline_template_uuid'] = pt['uuid'] if dst_project: pi['owner_uuid'] = dst_project else: del pi['owner_uuid'] - # Rename the repositories named in the components to the dst_git_repo - for c in pi['components']: - component = pi['components'][c] - if 'repository' in component: - component['repository'] = dst_git_repo - if 'job' in component and 'repository' in component['job']: - component['job']['repository'] = dst_git_repo else: # not recursive - print >>sys.stderr, "Copying only pipeline instance {}.".format(obj_uuid) + print >>sys.stderr, "Copying only pipeline instance {}.".format(pi_uuid) print >>sys.stderr, "You are responsible for making sure all pipeline dependencies have been updated." # Create the new pipeline instance at the destination Arvados. new_pi = dst.pipeline_instances().create(pipeline_instance=pi).execute() return new_pi -# copy_pipeline_template(obj_uuid, src, dst) +# copy_pipeline_template(pt_uuid, recursive, src, dst) +# +# Copies a pipeline template identified by pt_uuid from src to dst. # -# Copies a pipeline template identified by obj_uuid from src to dst. +# If the 'recursive' option evaluates to true, also copy any collections, +# docker images and git repositories that this template references. # # The owner_uuid of the new template is changed to that of the user # who copied the template. # -def copy_pipeline_template(obj_uuid, src=None, dst=None): +# Returns the copied pipeline template object. +# +def copy_pipeline_template(pt_uuid, recursive=True, src=None, dst=None): # fetch the pipeline template from the source instance - old_pt = src.pipeline_templates().get(uuid=obj_uuid).execute() - old_pt['name'] = old_pt['name'] + ' copy' - del old_pt['uuid'] - del old_pt['owner_uuid'] - return dst.pipeline_templates().create(body=old_pt).execute() + pt = src.pipeline_templates().get(uuid=pt_uuid).execute() + + if recursive: + # Copy input collections, docker images and git repos. + pt = copy_collections(pt, src, dst) + copy_git_repos(pt, dst_git_repo, src, dst) + + pt['name'] = pt['name'] + ' copy' + del pt['uuid'] + del pt['owner_uuid'] + + return dst.pipeline_templates().create(body=pt).execute() + +# copy_collections(obj, src, dst) +# +# Recursively copies all collections referenced by 'obj' from src +# to dst. +# +# Returns a copy of obj with any old collection uuids replaced by +# the new ones. +# +def copy_collections(obj, src, dst): + if type(obj) == str: + if uuid_type(src, obj) == 'Collection': + newc = copy_collection(obj, src, dst) + if obj != newc['uuid'] and obj != newc['portable_data_hash']: + return newc['uuid'] + return obj + elif type(obj) == dict: + return {v: copy_collections(obj[v], src, dst) for v in obj} + elif type(obj) == list: + return [copy_collections(v, src, dst) for v in obj] + return obj + +# copy_git_repos(p, dst_repo, dst_branch, src, dst) +# +# Copy all git repositories referenced by pipeline instance or +# template 'p' from src to dst. +# +# p is updated +# Git repository dependencies are identified by: +# * p['components'][c]['repository'] +# * p['components'][c]['job']['repository'] +# for each component c in the pipeline. +# +def copy_git_repos(p, dst_repo, src=None, dst=None): + copied = set() + for c in p['components']: + component = p['components'][c] + if 'repository' in component: + repo = component['repository'] + if repo not in copied: + dst_branch = p['uuid'] + copy_git_repo(repo, dst_repo, dst_branch, src, dst) + copied.add(repo) + component['repository'] = dst_repo + if 'job' in component and 'repository' in component['job']: + repo = component['job']['repository'] + if repo not in copied: + dst_branch = p['uuid'] + copy_git_repo(repo, dst_repo, dst_branch, src, dst) + copied.add(repo) + component['job']['repository'] = dst_repo + return repos + +# copy_collection(obj_uuid, src, dst) +# +# Copy the collection identified by obj_uuid from src to dst. +# Returns the collection object created at dst. +# +# For this application, it is critical to preserve the +# collection's manifest hash, which is not guaranteed with the +# arvados.CollectionReader and arvados.CollectionWriter classes. +# Copying each block in the collection manually, followed by +# the manifest block, ensures that the collection's manifest +# hash will not change. +# +def copy_collection(obj_uuid, src=None, dst=None): + c = src.collections().get(uuid=obj_uuid).execute() + + # Check whether a collection with this hash already exists + # at the destination. If so, just return that collection. + if 'portable_data_hash' in c: + colhash = c['portable_data_hash'] + else: + colhash = c['uuid'] + dstcol = dst.collections().list( + filters=[['portable_data_hash', '=', colhash]] + ).execute() + if dstcol['items_available'] > 0: + return dstcol['items'][0] + + # Fetch the collection's manifest. + manifest = c['manifest_text'] + logging.debug('copying collection %s', obj_uuid) + logging.debug('manifest_text = %s', manifest) + + # Enumerate the block locators found in the manifest. + collection_blocks = sets.Set() + src_keep = arvados.keep.KeepClient(src) + for line in manifest.splitlines(): + try: + block_hash = line.split()[1] + collection_blocks.add(block_hash) + except ValueError: + abort('bad manifest line in collection {}: {}'.format(obj_uuid, f)) + + # Copy each block from src_keep to dst_keep. + dst_keep = arvados.keep.KeepClient(dst) + for locator in collection_blocks: + data = src_keep.get(locator) + logger.debug('copying block %s', locator) + logger.info("Retrieved %d bytes", len(data)) + dst_keep.put(data) + + # Copy the manifest and save the collection. + logger.debug('saving {} manifest: {}'.format(obj_uuid, manifest)) + dst_keep.put(manifest) + return dst.collections().create(body={"manifest_text": manifest}).execute() # copy_git_repo(src_git_repo, dst_git_repo, dst_branch, src, dst) #