X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bdb92619b5f6d920119b8c32c3027cf4b751ed16..954b101b542b59ba76885c123a64b9fbf55b3c41:/sdk/python/arvados/commands/arv_copy.py diff --git a/sdk/python/arvados/commands/arv_copy.py b/sdk/python/arvados/commands/arv_copy.py index 1f72635406..5c5192860c 100755 --- a/sdk/python/arvados/commands/arv_copy.py +++ b/sdk/python/arvados/commands/arv_copy.py @@ -33,6 +33,7 @@ import arvados.keep import arvados.util import arvados.commands._util as arv_cmd import arvados.commands.keepdocker +import ruamel.yaml as yaml from arvados.api import OrderedJsonModel from arvados._version import __version__ @@ -112,7 +113,7 @@ def main(): copy_opts.set_defaults(recursive=True) parser = argparse.ArgumentParser( - description='Copy a pipeline instance, template or collection from one Arvados instance to another.', + description='Copy a pipeline instance, template, workflow, or collection from one Arvados instance to another.', parents=[copy_opts, arv_cmd.retry_opt]) args = parser.parse_args() @@ -144,6 +145,9 @@ def main(): set_src_owner_uuid(src_arv.pipeline_templates(), args.object_uuid, args) result = copy_pipeline_template(args.object_uuid, src_arv, dst_arv, args) + elif t == 'Workflow': + set_src_owner_uuid(src_arv.workflows(), args.object_uuid, args) + result = copy_workflow(args.object_uuid, src_arv, dst_arv, args) else: abort("cannot copy object {} of type {}".format(args.object_uuid, t)) @@ -405,6 +409,64 @@ def copy_pipeline_template(pt_uuid, src, dst, args): return dst.pipeline_templates().create(body=pt, ensure_unique_name=True).execute(num_retries=args.retries) +# copy_workflow(wf_uuid, src, dst, args) +# +# Copies a workflow identified by wf_uuid from src to dst. +# +# If args.recursive is True, also copy any collections +# referenced in the workflow definition yaml. +# +# The owner_uuid of the new workflow is set to any given +# project_uuid or the user who copied the template. +# +# Returns the copied workflow object. +# +def copy_workflow(wf_uuid, src, dst, args): + # fetch the workflow from the source instance + wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries) + + # copy collections and docker images + if args.recursive: + wf_def = yaml.safe_load(wf["definition"]) + if wf_def is not None: + locations = [] + docker_images = {} + graph = wf_def.get('$graph', None) + if graph is not None: + workflow_collections(graph, locations, docker_images) + else: + workflow_collections(wf_def, locations, docker_images) + + if locations: + copy_collections(locations, src, dst, args) + + for image in docker_images: + copy_docker_image(image, docker_images[image], src, dst, args) + + # copy the workflow itself + del wf['uuid'] + wf['owner_uuid'] = args.project_uuid + return dst.workflows().create(body=wf).execute(num_retries=args.retries) + +def workflow_collections(obj, locations, docker_images): + if isinstance(obj, dict): + loc = obj.get('location', None) + if loc is not None: + if loc.startswith("keep:"): + locations.append(loc[5:]) + + docker_image = obj.get('dockerImageId', None) or obj.get('dockerPull', None) + if docker_image is not None: + ds = docker_image.split(":", 1) + tag = ds[1] if len(ds)==2 else 'latest' + docker_images[ds[0]] = tag + + for x in obj: + workflow_collections(obj[x], locations, docker_images) + elif isinstance(obj, list): + for x in obj: + workflow_collections(x, locations, docker_images) + # copy_collections(obj, src, dst, args) # # Recursively copies all collections referenced by 'obj' from src