X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/504e09d413026fcac8ac94530134da2fce4dc0f2..66ef73b96e38c87a84ae64bd19385d5d5e9e07ed:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 2239e0f9df..f2517f9199 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -31,8 +31,7 @@ import cwltool.workflow from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process, fill_in_defaults) from cwltool.load_tool import fetch_document -from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class -from cwltool.utils import aslist +from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION @@ -41,7 +40,8 @@ import schema_salad.validate as validate import arvados.collection from .util import collectionUUID -import ruamel.yaml as yaml +from ruamel.yaml import YAML +from ruamel.yaml.comments import CommentedMap, CommentedSeq import arvados_cwl.arvdocker from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern @@ -82,7 +82,7 @@ def find_defaults(d, op): for i in viewvalues(d): find_defaults(i, op) -def make_builder(joborder, hints, requirements, runtimeContext): +def make_builder(joborder, hints, requirements, runtimeContext, metadata): return Builder( job=joborder, files=[], # type: List[Dict[Text, Text]] @@ -105,6 +105,8 @@ def make_builder(joborder, hints, requirements, runtimeContext): outdir="", # type: Text tmpdir="", # type: Text stagedir="", # type: Text + cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"), + container_engine="docker" ) def search_schemadef(name, reqs): @@ -126,6 +128,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered) return + if inputschema == "File": + inputschema = {"type": "File"} + if isinstance(inputschema, basestring): sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements)) if sd: @@ -168,21 +173,63 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov # # Found a file, check for secondaryFiles # - primary["secondaryFiles"] = [] + specs = [] + primary["secondaryFiles"] = secondaryspec for i, sf in enumerate(aslist(secondaryspec)): - pattern = builder.do_eval(sf["pattern"], context=primary) + if builder.cwlVersion == "v1.0": + pattern = builder.do_eval(sf, context=primary) + else: + pattern = builder.do_eval(sf["pattern"], context=primary) if pattern is None: continue - sfpath = substitute(primary["location"], pattern) - required = builder.do_eval(sf.get("required"), context=primary) + if isinstance(pattern, list): + specs.extend(pattern) + elif isinstance(pattern, dict): + specs.append(pattern) + elif isinstance(pattern, str): + if builder.cwlVersion == "v1.0": + specs.append({"pattern": pattern, "required": True}) + else: + specs.append({"pattern": pattern, "required": sf.get("required")}) + else: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Expression must return list, object, string or null") + + found = [] + for i, sf in enumerate(specs): + if isinstance(sf, dict): + if sf.get("class") == "File": + pattern = None + if sf.get("location") is None: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "File object is missing 'location': %s" % sf) + sfpath = sf["location"] + required = True + else: + pattern = sf["pattern"] + required = sf.get("required") + elif isinstance(sf, str): + pattern = sf + required = True + else: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Expression must return list, object, string or null") + + if pattern is not None: + sfpath = substitute(primary["location"], pattern) + + required = builder.do_eval(required, context=primary) if fsaccess.exists(sfpath): - primary["secondaryFiles"].append({"location": sfpath, "class": "File"}) + if pattern is not None: + found.append({"location": sfpath, "class": "File"}) + else: + found.append(sf) elif required: raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( "Required secondary file '%s' does not exist" % sfpath) - primary["secondaryFiles"] = cmap(primary["secondaryFiles"]) + primary["secondaryFiles"] = cmap(found) if discovered is not None: discovered[primary["location"]] = primary["secondaryFiles"] elif inputschema["type"] not in primitive_types_set: @@ -221,7 +268,8 @@ def upload_dependencies(arvrunner, name, document_loader, textIO = StringIO(text.decode('utf-8')) else: textIO = StringIO(text) - return yaml.safe_load(textIO) + yamlloader = YAML(typ='safe', pure=True) + return yamlloader.load(textIO) else: return {} @@ -236,10 +284,21 @@ def upload_dependencies(arvrunner, name, document_loader, # that external references in $include and $mixin are captured. scanobj = loadref("", workflowobj["id"]) + metadata = scanobj + sc_result = scandeps(uri, scanobj, - loadref_fields, - set(("$include", "$schemas", "location")), - loadref, urljoin=document_loader.fetcher.urljoin) + loadref_fields, + set(("$include", "location")), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + optional_deps = scandeps(uri, scanobj, + loadref_fields, + set(("$schemas",)), + loadref, urljoin=document_loader.fetcher.urljoin, + nestdirs=False) + + sc_result.extend(optional_deps) sc = [] uuids = {} @@ -297,24 +356,14 @@ def upload_dependencies(arvrunner, name, document_loader, if include_primary and "id" in workflowobj: sc.append({"class": "File", "location": workflowobj["id"]}) - if "$schemas" in workflowobj: - for s in workflowobj["$schemas"]: - sc.append({"class": "File", "location": s}) - def visit_default(obj): - remove = [False] - def ensure_default_location(f): + def defaults_are_optional(f): if "location" not in f and "path" in f: f["location"] = f["path"] del f["path"] - if "location" in f and not arvrunner.fs_access.exists(f["location"]): - # Doesn't exist, remove from list of dependencies to upload - sc[:] = [x for x in sc if x["location"] != f["location"]] - # Delete "default" from workflowobj - remove[0] = True - visit_class(obj["default"], ("File", "Directory"), ensure_default_location) - if remove[0]: - del obj["default"] + normalizeFilesDirs(f) + optional_deps.append(f) + visit_class(obj["default"], ("File", "Directory"), defaults_are_optional) find_defaults(workflowobj, visit_default) @@ -327,7 +376,8 @@ def upload_dependencies(arvrunner, name, document_loader, builder = make_builder(builder_job_order, obj.get("hints", []), obj.get("requirements", []), - ArvRuntimeContext()) + ArvRuntimeContext(), + metadata) discover_secondary_files(arvrunner.fs_access, builder, obj["inputs"], @@ -349,7 +399,8 @@ def upload_dependencies(arvrunner, name, document_loader, "keep:%s", "keep:%s/%s", name=name, - single_collection=True) + single_collection=True, + optional_deps=optional_deps) def setloc(p): loc = p.get("location") @@ -392,9 +443,10 @@ def upload_dependencies(arvrunner, name, document_loader, discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] if "$schemas" in workflowobj: - sch = [] + sch = CommentedSeq() for s in workflowobj["$schemas"]: - sch.append(mapper.mapper(s).resolved) + if s in mapper: + sch.append(mapper.mapper(s).resolved) workflowobj["$schemas"] = sch return mapper @@ -410,9 +462,16 @@ def upload_docker(arvrunner, tool): # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) else: - arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid) + arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__}, + True, arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) elif isinstance(tool, cwltool.workflow.Workflow): for s in tool.steps: upload_docker(arvrunner, s.embedded_tool) @@ -424,23 +483,33 @@ def packed_workflow(arvrunner, tool, merged_map): A "packed" workflow is one where all the components have been combined into a single document.""" rewrites = {} - packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), - tool.tool["id"], tool.metadata, rewrite_out=rewrites) + packed = pack(arvrunner.loadingContext, tool.tool["id"], + rewrite_out=rewrites, + loader=tool.doc_loader) rewrite_to_orig = {v: k for k,v in viewitems(rewrites)} def visit(v, cur_id): if isinstance(v, dict): - if v.get("class") in ("CommandLineTool", "Workflow"): - if "id" not in v: - raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field") - cur_id = rewrite_to_orig.get(v["id"], v["id"]) - if "location" in v and not v["location"].startswith("keep:"): - v["location"] = merged_map[cur_id].resolved[v["location"]] - if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: - v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] + if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"): + if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v: + raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1") + if "id" in v: + cur_id = rewrite_to_orig.get(v["id"], v["id"]) + if "path" in v and "location" not in v: + v["location"] = v["path"] + del v["path"] + if "location" in v and cur_id in merged_map: + if v["location"] in merged_map[cur_id].resolved: + v["location"] = merged_map[cur_id].resolved[v["location"]] + if v["location"] in merged_map[cur_id].secondaryFiles: + v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]] if v.get("class") == "DockerRequirement": - v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid) + v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, + arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) for l in v: visit(v[l], cur_id) if isinstance(v, list): @@ -484,7 +553,8 @@ def upload_job_order(arvrunner, name, tool, job_order): builder = make_builder(builder_job_order, tool.hints, tool.requirements, - ArvRuntimeContext()) + ArvRuntimeContext(), + tool.metadata) # Now update job_order with secondaryFiles discover_secondary_files(arvrunner.fs_access, builder, @@ -544,7 +614,10 @@ def arvados_jobs_image(arvrunner, img): """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it.""" try: - return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid) + return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid, + arvrunner.runtimeContext.force_docker_pull, + arvrunner.runtimeContext.tmp_outdir_prefix, + arvrunner.runtimeContext.match_local_docker) except Exception as e: raise Exception("Docker image %s is not available\n%s" % (img, e) )