X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cd4a811b896da640e5b8ddca7e515f19085932d4..ffae77ee85f7a6dd4a095298aaa9dba145c98bdd:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 71e499ebca..7664abef7c 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -31,8 +31,7 @@ import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) from cwltool.load_tool import fetch_document -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class -from cwltool.utils import aslist +from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION @@ -83,7 +82,7 @@ def find_defaults(d, op): for i in viewvalues(d): find_defaults(i, op) -def make_builder(joborder, hints, requirements, runtimeContext): +def make_builder(joborder, hints, requirements, runtimeContext, metadata): return Builder( job=joborder, files=[], # type: List[Dict[Text, Text]] @@ -106,6 +105,7 @@ def make_builder(joborder, hints, requirements, runtimeContext): outdir="", # type: Text tmpdir="", # type: Text stagedir="", # type: Text + cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion") ) def search_schemadef(name, reqs): @@ -172,7 +172,10 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov specs = [] primary["secondaryFiles"] = secondaryspec for i, sf in enumerate(aslist(secondaryspec)): - pattern = builder.do_eval(sf["pattern"], context=primary) + if builder.cwlVersion == "v1.0": + pattern = builder.do_eval(sf, context=primary) + else: + pattern = builder.do_eval(sf["pattern"], context=primary) if pattern is None: continue if isinstance(pattern, list): @@ -263,6 +266,8 @@ def upload_dependencies(arvrunner, name, document_loader, # that external references in $include and $mixin are captured. scanobj = loadref("", workflowobj["id"]) + metadata = scanobj + sc_result = scandeps(uri, scanobj, loadref_fields, set(("$include", "$schemas", "location")), @@ -354,7 +359,8 @@ def upload_dependencies(arvrunner, name, document_loader, builder = make_builder(builder_job_order, obj.get("hints", []), obj.get("requirements", []), - ArvRuntimeContext()) + ArvRuntimeContext(), + metadata) discover_secondary_files(arvrunner.fs_access, builder, obj["inputs"], @@ -421,7 +427,8 @@ def upload_dependencies(arvrunner, name, document_loader, if "$schemas" in workflowobj: sch = CommentedSeq() for s in workflowobj["$schemas"]: - sch.append(mapper.mapper(s).resolved) + if s in mapper: + sch.append(mapper.mapper(s).resolved) workflowobj["$schemas"] = sch return mapper @@ -437,9 +444,14 @@ def upload_docker(arvrunner, tool): # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix) else: - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, + True, arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -472,7 +484,10 @@ def packed_workflow(arvrunner, tool, merged_map): if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": - v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid) + v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, + arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -516,7 +531,8 @@ def upload_job_order(arvrunner, name, tool, job_order): builder = make_builder(builder_job_order, tool.hints, tool.requirements, - ArvRuntimeContext()) + ArvRuntimeContext(), + tool.metadata) # Now update job_order with secondaryFiles discover_secondary_files(arvrunner.fs_access, builder, @@ -576,7 +592,9 @@ def arvados_jobs_image(arvrunner, img): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) )