X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4aee7d57faff02fc6b7b6f750dc22a29e58bb963..6e5b24e817a0972ab30c9065cfc6a726821e7b66:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 912faf0e87..b10f02d140 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -42,9 +42,10 @@ import schema_salad.validate as validate import arvados.collection from .util import collectionUUID import ruamel.yaml as yaml +from ruamel.yaml.comments import CommentedMap, CommentedSeq import arvados_cwl.arvdocker -from .pathmapper import ArvPathMapper, trim_listing +from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern from ._version import __version__ from . import done from . context import ArvRuntimeContext @@ -82,7 +83,7 @@ def find_defaults(d, op): for i in viewvalues(d): find_defaults(i, op) -def make_builder(joborder, hints, requirements, runtimeContext): +def make_builder(joborder, hints, requirements, runtimeContext, metadata): return Builder( job=joborder, files=[], # type: List[Dict[Text, Text]] @@ -105,6 +106,7 @@ def make_builder(joborder, hints, requirements, runtimeContext): outdir="", # type: Text tmpdir="", # type: Text stagedir="", # type: Text + cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion") ) def search_schemadef(name, reqs): @@ -168,21 +170,50 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov # # Found a file, check for secondaryFiles # - primary["secondaryFiles"] = [] + specs = [] + primary["secondaryFiles"] = secondaryspec for i, sf in enumerate(aslist(secondaryspec)): - pattern = builder.do_eval(sf["pattern"], context=primary) + if builder.cwlVersion == "v1.0": + pattern = builder.do_eval(sf, context=primary) + else: + pattern = builder.do_eval(sf["pattern"], context=primary) if pattern is None: continue + if isinstance(pattern, list): + specs.extend(pattern) + elif isinstance(pattern, dict): + specs.append(pattern) + elif isinstance(pattern, str): + specs.append({"pattern": pattern}) + else: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Expression must return list, object, string or null") + + found = [] + for i, sf in enumerate(specs): + if isinstance(sf, dict): + if sf.get("class") == "File": + pattern = sf["basename"] + else: + pattern = sf["pattern"] + required = sf.get("required") + elif isinstance(sf, str): + pattern = sf + required = True + else: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Expression must return list, object, string or null") + sfpath = substitute(primary["location"], pattern) - required = builder.do_eval(sf.get("required"), context=primary) + required = builder.do_eval(required, context=primary) if fsaccess.exists(sfpath): - primary["secondaryFiles"].append({"location": sfpath, "class": "File"}) + found.append({"location": sfpath, "class": "File"}) elif required: raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( "Required secondary file '%s' does not exist" % sfpath) - primary["secondaryFiles"] = cmap(primary["secondaryFiles"]) + primary["secondaryFiles"] = cmap(found) if discovered is not None: discovered[primary["location"]] = primary["secondaryFiles"] elif inputschema["type"] not in primitive_types_set: @@ -194,9 +225,6 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No if isinstance(primary, (Mapping, Sequence)): set_secondary(fsaccess, builder, inputschema, None, primary, discovered) -collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$') -collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?') - def upload_dependencies(arvrunner, name, document_loader, workflowobj, uri, loadref_run, include_primary=True, discovered_secondaryfiles=None): @@ -239,6 +267,8 @@ def upload_dependencies(arvrunner, name, document_loader, # that external references in $include and $mixin are captured. scanobj = loadref("", workflowobj["id"]) + metadata = scanobj + sc_result = scandeps(uri, scanobj, loadref_fields, set(("$include", "$schemas", "location")), @@ -330,14 +360,16 @@ def upload_dependencies(arvrunner, name, document_loader, builder = make_builder(builder_job_order, obj.get("hints", []), obj.get("requirements", []), - ArvRuntimeContext()) + ArvRuntimeContext(), + metadata) discover_secondary_files(arvrunner.fs_access, builder, obj["inputs"], builder_job_order, discovered) - visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files) + copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False) + visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files) for d in list(discovered): # Only interested in discovered secondaryFiles which are local @@ -394,7 +426,7 @@ def upload_dependencies(arvrunner, name, document_loader, discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] if "$schemas" in workflowobj: - sch = [] + sch = CommentedSeq() for s in workflowobj["$schemas"]: sch.append(mapper.mapper(s).resolved) workflowobj["$schemas"] = sch @@ -426,17 +458,22 @@ def packed_workflow(arvrunner, tool, merged_map): A "packed" workflow is one where all the components have been combined into a single document.""" rewrites = {} - packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), - tool.tool["id"], tool.metadata, rewrite_out=rewrites) + packed = pack(arvrunner.loadingContext, tool.tool["id"], + rewrite_out=rewrites, + loader=tool.doc_loader) rewrite_to_orig = {v: k for k,v in viewitems(rewrites)} def visit(v, cur_id): if isinstance(v, dict): if v.get("class") in ("CommandLineTool", "Workflow"): - if "id" not in v: - raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field") - cur_id = rewrite_to_orig.get(v["id"], v["id"]) + if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v: + raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1") + if "id" in v: + cur_id = rewrite_to_orig.get(v["id"], v["id"]) + if "path" in v and "location" not in v: + v["location"] = v["path"] + del v["path"] if "location" in v and not v["location"].startswith("keep:"): v["location"] = merged_map[cur_id].resolved[v["location"]] if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: @@ -486,7 +523,8 @@ def upload_job_order(arvrunner, name, tool, job_order): builder = make_builder(builder_job_order, tool.hints, tool.requirements, - ArvRuntimeContext()) + ArvRuntimeContext(), + tool.metadata) # Now update job_order with secondaryFiles discover_secondary_files(arvrunner.fs_access, builder, @@ -580,7 +618,8 @@ class Runner(Process): """Base class for runner processes, which submit an instance of arvados-cwl-runner and wait for the final result.""" - def __init__(self, runner, tool, loadingContext, enable_reuse, + def __init__(self, runner, updated_tool, + tool, loadingContext, enable_reuse, output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, intermediate_output_ttl=0, merged_map=None, @@ -589,10 +628,9 @@ class Runner(Process): collection_cache_is_default=True): loadingContext = loadingContext.copy() - loadingContext.metadata = loadingContext.metadata.copy() - loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION + loadingContext.metadata = updated_tool.metadata.copy() - super(Runner, self).__init__(tool.tool, loadingContext) + super(Runner, self).__init__(updated_tool.tool, loadingContext) self.arvrunner = runner self.embedded_tool = tool