X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/29e3fe60d22218d122a1a3448a144bfcfd64785a..4af2a63dcd4460b11c8f9f05a51caf1954972c95:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index eae7a77896..10323c2be7 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -39,6 +39,7 @@ from cwltool.builder import Builder import schema_salad.validate as validate import arvados.collection +import arvados.util from .util import collectionUUID from ruamel.yaml import YAML from ruamel.yaml.comments import CommentedMap, CommentedSeq @@ -353,25 +354,14 @@ def upload_dependencies(arvrunner, name, document_loader, if include_primary and "id" in workflowobj: sc.append({"class": "File", "location": workflowobj["id"]}) - #if "$schemas" in workflowobj: - # for s in workflowobj["$schemas"]: - # sc.append({"class": "File", "location": s}) - def visit_default(obj): - #remove = [False] - def ensure_default_location(f): + def defaults_are_optional(f): if "location" not in f and "path" in f: f["location"] = f["path"] del f["path"] + normalizeFilesDirs(f) optional_deps.append(f) - #if "location" in f and not arvrunner.fs_access.exists(f["location"]): - # # Doesn't exist, remove from list of dependencies to upload - # sc[:] = [x for x in sc if x["location"] != f["location"]] - # # Delete "default" from workflowobj - # remove[0] = True - visit_class(obj["default"], ("File", "Directory"), ensure_default_location) - #if remove[0]: - # del obj["default"] + visit_class(obj["default"], ("File", "Directory"), defaults_are_optional) find_defaults(workflowobj, visit_default) @@ -403,8 +393,6 @@ def upload_dependencies(arvrunner, name, document_loader, else: del discovered[d] - print("NN", sc) - mapper = ArvPathMapper(arvrunner, sc, "", "keep:%s", "keep:%s/%s", @@ -412,12 +400,16 @@ def upload_dependencies(arvrunner, name, document_loader, single_collection=True, optional_deps=optional_deps) - print("whargh", mapper._pathmap) + keeprefs = set() + def addkeepref(k): + if k.startswith("keep:"): + keeprefs.add(collection_pdh_pattern.match(k).group(1)) def setloc(p): loc = p.get("location") if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved + addkeepref(p["location"]) return if not loc: @@ -439,7 +431,10 @@ def upload_dependencies(arvrunner, name, document_loader, gp = collection_uuid_pattern.match(loc) if not gp: + # Not a uuid pattern (must be a pdh pattern) + addkeepref(p["location"]) return + uuid = gp.groups()[0] if uuid not in uuid_map: raise SourceLine(p, "location", validate.ValidationException).makeError( @@ -454,6 +449,38 @@ def upload_dependencies(arvrunner, name, document_loader, for d in discovered: discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] + if arvrunner.runtimeContext.copy_deps: + # Find referenced collections and copy them into the + # destination project, for easy sharing. + already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list, + filters=[["portable_data_hash", "in", list(keeprefs)], + ["owner_uuid", "=", arvrunner.project_uuid]], + select=["uuid", "portable_data_hash", "created_at"])) + + keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present) + for kr in keeprefs: + col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]], + order="created_at desc", + select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"], + limit=1).execute() + if len(col["items"]) == 0: + logger.warning("Cannot find collection with portable data hash %s", kr) + continue + col = col["items"][0] + try: + arvrunner.api.collections().create(body={"collection": { + "owner_uuid": arvrunner.project_uuid, + "name": col["name"], + "description": col["description"], + "properties": col["properties"], + "portable_data_hash": col["portable_data_hash"], + "manifest_text": col["manifest_text"], + "storage_classes_desired": col["storage_classes_desired"], + "trash_at": col["trash_at"] + }}, ensure_unique_name=True).execute() + except Exception as e: + logger.warning("Unable copy collection to destination: %s", e) + if "$schemas" in workflowobj: sch = CommentedSeq() for s in workflowobj["$schemas"]: @@ -471,19 +498,21 @@ def upload_docker(arvrunner, tool): (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement") if docker_req: if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers": - # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) else: arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -521,7 +550,8 @@ def packed_workflow(arvrunner, tool, merged_map): arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -629,7 +659,8 @@ def arvados_jobs_image(arvrunner, img): return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, arvrunner.runtimeContext.force_docker_pull, arvrunner.runtimeContext.tmp_outdir_prefix, - arvrunner.runtimeContext.match_local_docker) + arvrunner.runtimeContext.match_local_docker, + arvrunner.runtimeContext.copy_deps) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) )