X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1a6cf8e65c86a0002f0cf0c3a2d4092b67f9b57b..a89fbc8b4f2d8db8654175428bd1f041eed6f109:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index c8668afcac..f232178c5d 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -39,6 +39,7 @@ from cwltool.builder import Builder import schema_salad.validate as validate import arvados.collection +import arvados.util from .util import collectionUUID from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq @@ -258,7 +259,7 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No set_secondary(fsaccess, builder, inputschema, None, primary, discovered) def upload_dependencies(arvrunner, name, document_loader, - workflowobj, uri, loadref_run, + workflowobj, uri, loadref_run, runtimeContext, include_primary=True, discovered_secondaryfiles=None): """Upload the dependencies of the workflowobj document to Keep. @@ -418,10 +419,16 @@ def upload_dependencies(arvrunner, name, document_loader, single_collection=True, optional_deps=optional_deps) + keeprefs = set() + def addkeepref(k): + if k.startswith("keep:"): + keeprefs.add(collection_pdh_pattern.match(k).group(1)) + def setloc(p): loc = p.get("location") if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved + addkeepref(p["location"]) return if not loc: @@ -443,7 +450,10 @@ def upload_dependencies(arvrunner, name, document_loader, gp = collection_uuid_pattern.match(loc) if not gp: + # Not a uuid pattern (must be a pdh pattern) + addkeepref(p["location"]) return + uuid = gp.groups()[0] if uuid not in uuid_map: raise SourceLine(p, "location", validate.ValidationException).makeError( @@ -458,6 +468,38 @@ def upload_dependencies(arvrunner, name, document_loader, for d in discovered: discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] + if runtimeContext.copy_deps: + # Find referenced collections and copy them into the + # destination project, for easy sharing. + already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list, + filters=[["portable_data_hash", "in", list(keeprefs)], + ["owner_uuid", "=", runtimeContext.project_uuid]], + select=["uuid", "portable_data_hash", "created_at"])) + + keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present) + for kr in keeprefs: + col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]], + order="created_at desc", + select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"], + limit=1).execute() + if len(col["items"]) == 0: + logger.warning("Cannot find collection with portable data hash %s", kr) + continue + col = col["items"][0] + try: + arvrunner.api.collections().create(body={"collection": { + "owner_uuid": runtimeContext.project_uuid, + "name": col["name"], + "description": col["description"], + "properties": col["properties"], + "portable_data_hash": col["portable_data_hash"], + "manifest_text": col["manifest_text"], + "storage_classes_desired": col["storage_classes_desired"], + "trash_at": col["trash_at"] + }}, ensure_unique_name=True).execute() + except Exception as e: + logger.warning("Unable copy collection to destination: %s", e) + if "$schemas" in workflowobj: sch = CommentedSeq() for s in workflowobj["$schemas"]: @@ -468,32 +510,36 @@ def upload_dependencies(arvrunner, name, document_loader, return mapper -def upload_docker(arvrunner, tool): +def upload_docker(arvrunner, tool, runtimeContext): """Uploads Docker images used in CommandLineTool objects.""" if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement") if docker_req: if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers": - # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, - True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: - upload_docker(arvrunner, s.embedded_tool) + upload_docker(arvrunner, s.embedded_tool, runtimeContext) -def packed_workflow(arvrunner, tool, merged_map): +def packed_workflow(arvrunner, tool, merged_map, runtimeContext): """Create a packed workflow. A "packed" workflow is one where all the components have been combined into a single document.""" @@ -522,10 +568,11 @@ def packed_workflow(arvrunner, tool, merged_map): v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, - arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -546,7 +593,7 @@ def tag_git_version(packed): packed["http://schema.org/version"] = githash -def upload_job_order(arvrunner, name, tool, job_order): +def upload_job_order(arvrunner, name, tool, job_order, runtimeContext): """Upload local files referenced in the input object and return updated input object with 'location' updated to the proper keep references. """ @@ -582,7 +629,8 @@ def upload_job_order(arvrunner, name, tool, job_order): tool.doc_loader, job_order, job_order.get("id", "#"), - False) + False, + runtimeContext) if "id" in job_order: del job_order["id"] @@ -596,10 +644,10 @@ def upload_job_order(arvrunner, name, tool, job_order): FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"]) -def upload_workflow_deps(arvrunner, tool): +def upload_workflow_deps(arvrunner, tool, runtimeContext): # Ensure that Docker images needed by this workflow are available - upload_docker(arvrunner, tool) + upload_docker(arvrunner, tool, runtimeContext) document_loader = tool.doc_loader @@ -614,6 +662,7 @@ def upload_workflow_deps(arvrunner, tool): deptool, deptool["id"], False, + runtimeContext, include_primary=False, discovered_secondaryfiles=discovered_secondaryfiles) document_loader.idx[deptool["id"]] = deptool @@ -626,19 +675,22 @@ def upload_workflow_deps(arvrunner, tool): return merged_map -def arvados_jobs_image(arvrunner, img): +def arvados_jobs_image(arvrunner, img, runtimeContext): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, - arvrunner.runtimeContext.force_docker_pull, - arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, + True, + runtimeContext.project_uuid, + runtimeContext.force_docker_pull, + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) ) -def upload_workflow_collection(arvrunner, name, packed): +def upload_workflow_collection(arvrunner, name, packed, runtimeContext): collection = arvados.collection.Collection(api_client=arvrunner.api, keep_client=arvrunner.keep_client, num_retries=arvrunner.num_retries) @@ -647,15 +699,15 @@ def upload_workflow_collection(arvrunner, name, packed): filters = [["portable_data_hash", "=", collection.portable_data_hash()], ["name", "like", name+"%"]] - if arvrunner.project_uuid: - filters.append(["owner_uuid", "=", arvrunner.project_uuid]) + if runtimeContext.project_uuid: + filters.append(["owner_uuid", "=", runtimeContext.project_uuid]) exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries) if exists["items"]: logger.info("Using collection %s", exists["items"][0]["uuid"]) else: collection.save_new(name=name, - owner_uuid=arvrunner.project_uuid, + owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, num_retries=arvrunner.num_retries) logger.info("Uploaded to %s", collection.manifest_locator())