From: Peter Amstutz Date: Tue, 3 May 2022 21:01:26 +0000 (-0400) Subject: 19070: Add --copy-deps/--no-copy-deps X-Git-Tag: 2.5.0~179^2~8 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/b5d0e0ad775eb822b9ab4bed5b57c2e9072f4c0b 19070: Add --copy-deps/--no-copy-deps Copies dependencies by default using --create-workflow and --update workflow Keeps old behavior by default when running from the command line (can get new behavior with --copy-deps). Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index c73b358ecc..3faa510a0a 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -217,6 +217,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.") exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.") + exgroup = parser.add_mutually_exclusive_group() + exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.") + exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave depenencies where they are.") + parser.add_argument( "--skip-schemas", action="store_true", diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index f75bde81e6..33b4c90c61 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -247,7 +247,8 @@ class ArvadosContainer(JobBase): runtimeContext.project_uuid, runtimeContext.force_docker_pull, runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker) + runtimeContext.match_local_docker, + runtimeContext.copy_deps) network_req, _ = self.get_requirement("NetworkAccess") if network_req: diff --git a/sdk/cwl/arvados_cwl/arvdocker.py b/sdk/cwl/arvados_cwl/arvdocker.py index d5295afc23..cf0b3b9daf 100644 --- a/sdk/cwl/arvados_cwl/arvdocker.py +++ b/sdk/cwl/arvados_cwl/arvdocker.py @@ -57,7 +57,7 @@ def determine_image_id(dockerImageId): def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid, - force_pull, tmp_outdir_prefix, match_local_docker): + force_pull, tmp_outdir_prefix, match_local_docker, copy_deps): """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker.""" if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement: @@ -85,10 +85,14 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid image_tag=image_tag, project_uuid=None) - images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, - image_name=image_name, - image_tag=image_tag, - project_uuid=project_uuid) + if copy_deps: + # Only images that are available in the destination project + images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3, + image_name=image_name, + image_tag=image_tag, + project_uuid=project_uuid) + else: + images = out_of_project_images if match_local_docker: local_image_id = determine_image_id(dockerRequirement["dockerImageId"]) diff --git a/sdk/cwl/arvados_cwl/context.py b/sdk/cwl/arvados_cwl/context.py index 316250106b..64f85e2076 100644 --- a/sdk/cwl/arvados_cwl/context.py +++ b/sdk/cwl/arvados_cwl/context.py @@ -38,6 +38,7 @@ class ArvRuntimeContext(RuntimeContext): self.collection_cache_size = 256 self.match_local_docker = False self.enable_preemptible = None + self.copy_deps = None super(ArvRuntimeContext, self).__init__(kwargs) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index ef371b43df..ed57c6dae0 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -544,6 +544,16 @@ The 'jobs' API is no longer supported. if not runtimeContext.name: runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"]) + if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow): + runtimeContext.copy_deps = True + + if runtimeContext.update_workflow and self.project_uuid is None: + # If we are updating a workflow, make sure anything that + # gets uploaded goes into the same parent project, unless + # an alternate --project-uuid was provided. + existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute() + self.project_uuid = existing_wf["owner_uuid"] + # Upload local file references in the job order. job_order = upload_job_order(self, "%s input" % runtimeContext.name, updated_tool, job_order) @@ -571,10 +581,6 @@ The 'jobs' API is no longer supported. else: tool = updated_tool - if runtimeContext.update_workflow and self.project_uuid is None: - existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute() - self.project_uuid = existing_wf["owner_uuid"] - # Upload direct dependencies of workflow steps, get back mapping of files to keep references. # Also uploads docker images. merged_map = upload_workflow_deps(self, tool) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index dae0541bb4..6b670c73df 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -39,6 +39,7 @@ from cwltool.builder import Builder import schema_salad.validate as validate import arvados.collection +import arvados.util from .util import collectionUUID from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq @@ -399,10 +400,15 @@ def upload_dependencies(arvrunner, name, document_loader, single_collection=True, optional_deps=optional_deps) + keeprefs = set() + def addkeepref(k): + keeprefs.add(collection_pdh_pattern.match(k).group(1)) + def setloc(p): loc = p.get("location") if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved + addkeepref(p["location"]) return if not loc: @@ -424,7 +430,10 @@ def upload_dependencies(arvrunner, name, document_loader, gp = collection_uuid_pattern.match(loc) if not gp: + # Not a uuid pattern (must be a pdh pattern) + addkeepref(p["location"]) return + uuid = gp.groups()[0] if uuid not in uuid_map: raise SourceLine(p, "location", validate.ValidationException).makeError( @@ -439,6 +448,38 @@ def upload_dependencies(arvrunner, name, document_loader, for d in discovered: discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] + if arvrunner.runtimeContext.copy_deps: + # Find referenced collections and copy them into the + # destination project, for easy sharing. + already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list, + filters=[["portable_data_hash", "in", list(keeprefs)], + ["owner_uuid", "=", arvrunner.project_uuid]], + select=["uuid", "portable_data_hash", "created_at"])) + + keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present) + for kr in keeprefs: + col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]], + order="created_at desc", + select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"], + limit=1).execute() + if len(col["items"]) == 0: + logger.warning("Cannot find collection with portable data hash %s", kr) + continue + col = col["items"][0] + try: + arvrunner.api.collections().create(body={"collection": { + "owner_uuid": arvrunner.project_uuid, + "name": col["name"], + "description": col["description"], + "properties": col["properties"], + "portable_data_hash": col["portable_data_hash"], + "manifest_text": col["manifest_text"], + "storage_classes_desired": col["storage_classes_desired"], + "trash_at": col["trash_at"] + }}, ensure_unique_name=True).execute() + except Exception as e: + logger.warning("Unable copy collection to destination: %s", e) + if "$schemas" in workflowobj: sch = CommentedSeq() for s in workflowobj["$schemas"]: @@ -462,13 +503,15 @@ def upload_docker(arvrunner, tool): arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -506,7 +549,8 @@ def packed_workflow(arvrunner, tool, merged_map): arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -614,7 +658,8 @@ def arvados_jobs_image(arvrunner, img): return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) )