X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/215d15bc03a38be1965a5d9df21417a3f7eae032..6e5b24e817a0972ab30c9065cfc6a726821e7b66:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 9385bde63c..b10f02d140 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -5,6 +5,7 @@ from future import standard_library standard_library.install_aliases() from future.utils import viewvalues, viewitems +from past.builtins import basestring import os import sys @@ -13,8 +14,10 @@ import urllib.parse from functools import partial import logging import json +import copy from collections import namedtuple from io import StringIO +from typing import Mapping, Sequence if os.name == "posix" and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -25,22 +28,27 @@ from schema_salad.sourceline import SourceLine, cmap from cwltool.command_line_tool import CommandLineTool import cwltool.workflow -from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process +from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, + shortname, Process, fill_in_defaults) from cwltool.load_tool import fetch_document from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist from cwltool.builder import substitute from cwltool.pack import pack +from cwltool.update import INTERNAL_VERSION +from cwltool.builder import Builder import schema_salad.validate as validate import arvados.collection from .util import collectionUUID import ruamel.yaml as yaml +from ruamel.yaml.comments import CommentedMap, CommentedSeq import arvados_cwl.arvdocker -from .pathmapper import ArvPathMapper, trim_listing +from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern from ._version import __version__ from . import done +from . context import ArvRuntimeContext logger = logging.getLogger('arvados.cwl-runner') @@ -75,23 +83,147 @@ def find_defaults(d, op): for i in viewvalues(d): find_defaults(i, op) -def setSecondary(t, fileobj, discovered): - if isinstance(fileobj, dict) and fileobj.get("class") == "File": - if "secondaryFiles" not in fileobj: - fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]) - if discovered is not None: - discovered[fileobj["location"]] = fileobj["secondaryFiles"] - elif isinstance(fileobj, list): - for e in fileobj: - setSecondary(t, e, discovered) - -def discover_secondary_files(inputs, job_order, discovered=None): - for t in inputs: - if shortname(t["id"]) in job_order and t.get("secondaryFiles"): - setSecondary(t, job_order[shortname(t["id"])], discovered) +def make_builder(joborder, hints, requirements, runtimeContext, metadata): + return Builder( + job=joborder, + files=[], # type: List[Dict[Text, Text]] + bindings=[], # type: List[Dict[Text, Any]] + schemaDefs={}, # type: Dict[Text, Dict[Text, Any]] + names=None, # type: Names + requirements=requirements, # type: List[Dict[Text, Any]] + hints=hints, # type: List[Dict[Text, Any]] + resources={}, # type: Dict[str, int] + mutation_manager=None, # type: Optional[MutationManager] + formatgraph=None, # type: Optional[Graph] + make_fs_access=None, # type: Type[StdFsAccess] + fs_access=None, # type: StdFsAccess + job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any] + timeout=runtimeContext.eval_timeout, # type: float + debug=runtimeContext.debug, # type: bool + js_console=runtimeContext.js_console, # type: bool + force_docker_pull=runtimeContext.force_docker_pull, # type: bool + loadListing="", # type: Text + outdir="", # type: Text + tmpdir="", # type: Text + stagedir="", # type: Text + cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion") + ) + +def search_schemadef(name, reqs): + for r in reqs: + if r["class"] == "SchemaDefRequirement": + for sd in r["types"]: + if sd["name"] == name: + return sd + return None + +primitive_types_set = frozenset(("null", "boolean", "int", "long", + "float", "double", "string", "record", + "array", "enum")) + +def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered): + if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring): + # union type, collect all possible secondaryFiles + for i in inputschema: + set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered) + return + + if isinstance(inputschema, basestring): + sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements)) + if sd: + inputschema = sd + else: + return -collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$') -collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?') + if "secondaryFiles" in inputschema: + # set secondaryFiles, may be inherited by compound types. + secondaryspec = inputschema["secondaryFiles"] + + if (isinstance(inputschema["type"], (Mapping, Sequence)) and + not isinstance(inputschema["type"], basestring)): + # compound type (union, array, record) + set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered) + + elif (inputschema["type"] == "record" and + isinstance(primary, Mapping)): + # + # record type, find secondary files associated with fields. + # + for f in inputschema["fields"]: + p = primary.get(shortname(f["name"])) + if p: + set_secondary(fsaccess, builder, f, secondaryspec, p, discovered) + + elif (inputschema["type"] == "array" and + isinstance(primary, Sequence)): + # + # array type, find secondary files of elements + # + for p in primary: + set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered) + + elif (inputschema["type"] == "File" and + secondaryspec and + isinstance(primary, Mapping) and + primary.get("class") == "File" and + "secondaryFiles" not in primary): + # + # Found a file, check for secondaryFiles + # + specs = [] + primary["secondaryFiles"] = secondaryspec + for i, sf in enumerate(aslist(secondaryspec)): + if builder.cwlVersion == "v1.0": + pattern = builder.do_eval(sf, context=primary) + else: + pattern = builder.do_eval(sf["pattern"], context=primary) + if pattern is None: + continue + if isinstance(pattern, list): + specs.extend(pattern) + elif isinstance(pattern, dict): + specs.append(pattern) + elif isinstance(pattern, str): + specs.append({"pattern": pattern}) + else: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Expression must return list, object, string or null") + + found = [] + for i, sf in enumerate(specs): + if isinstance(sf, dict): + if sf.get("class") == "File": + pattern = sf["basename"] + else: + pattern = sf["pattern"] + required = sf.get("required") + elif isinstance(sf, str): + pattern = sf + required = True + else: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Expression must return list, object, string or null") + + sfpath = substitute(primary["location"], pattern) + required = builder.do_eval(required, context=primary) + + if fsaccess.exists(sfpath): + found.append({"location": sfpath, "class": "File"}) + elif required: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % sfpath) + + primary["secondaryFiles"] = cmap(found) + if discovered is not None: + discovered[primary["location"]] = primary["secondaryFiles"] + elif inputschema["type"] not in primitive_types_set: + set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered) + +def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None): + for inputschema in inputs: + primary = job_order.get(shortname(inputschema["id"])) + if isinstance(primary, (Mapping, Sequence)): + set_secondary(fsaccess, builder, inputschema, None, primary, discovered) def upload_dependencies(arvrunner, name, document_loader, workflowobj, uri, loadref_run, @@ -130,11 +262,13 @@ def upload_dependencies(arvrunner, name, document_loader, loadref_fields = set(("$import",)) scanobj = workflowobj - if "id" in workflowobj: + if "id" in workflowobj and not workflowobj["id"].startswith("_:"): # Need raw file content (before preprocessing) to ensure # that external references in $include and $mixin are captured. scanobj = loadref("", workflowobj["id"]) + metadata = scanobj + sc_result = scandeps(uri, scanobj, loadref_fields, set(("$include", "$schemas", "location")), @@ -219,11 +353,23 @@ def upload_dependencies(arvrunner, name, document_loader, discovered = {} def discover_default_secondary_files(obj): - discover_secondary_files(obj["inputs"], - {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t}, + builder_job_order = {} + for t in obj["inputs"]: + builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None + # Need to create a builder object to evaluate expressions. + builder = make_builder(builder_job_order, + obj.get("hints", []), + obj.get("requirements", []), + ArvRuntimeContext(), + metadata) + discover_secondary_files(arvrunner.fs_access, + builder, + obj["inputs"], + builder_job_order, discovered) - visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files) + copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False) + visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files) for d in list(discovered): # Only interested in discovered secondaryFiles which are local @@ -280,7 +426,7 @@ def upload_dependencies(arvrunner, name, document_loader, discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] if "$schemas" in workflowobj: - sch = [] + sch = CommentedSeq() for s in workflowobj["$schemas"]: sch.append(mapper.mapper(s).resolved) workflowobj["$schemas"] = sch @@ -312,17 +458,22 @@ def packed_workflow(arvrunner, tool, merged_map): A "packed" workflow is one where all the components have been combined into a single document.""" rewrites = {} - packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), - tool.tool["id"], tool.metadata, rewrite_out=rewrites) + packed = pack(arvrunner.loadingContext, tool.tool["id"], + rewrite_out=rewrites, + loader=tool.doc_loader) rewrite_to_orig = {v: k for k,v in viewitems(rewrites)} def visit(v, cur_id): if isinstance(v, dict): if v.get("class") in ("CommandLineTool", "Workflow"): - if "id" not in v: - raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field") - cur_id = rewrite_to_orig.get(v["id"], v["id"]) + if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v: + raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1") + if "id" in v: + cur_id = rewrite_to_orig.get(v["id"], v["id"]) + if "path" in v and "location" not in v: + v["location"] = v["path"] + del v["path"] if "location" in v and not v["location"].startswith("keep:"): v["location"] = merged_map[cur_id].resolved[v["location"]] if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles: @@ -354,7 +505,31 @@ def upload_job_order(arvrunner, name, tool, job_order): object with 'location' updated to the proper keep references. """ - discover_secondary_files(tool.tool["inputs"], job_order) + # Make a copy of the job order and set defaults. + builder_job_order = copy.copy(job_order) + + # fill_in_defaults throws an error if there are any + # missing required parameters, we don't want it to do that + # so make them all optional. + inputs_copy = copy.deepcopy(tool.tool["inputs"]) + for i in inputs_copy: + if "null" not in i["type"]: + i["type"] = ["null"] + aslist(i["type"]) + + fill_in_defaults(inputs_copy, + builder_job_order, + arvrunner.fs_access) + # Need to create a builder object to evaluate expressions. + builder = make_builder(builder_job_order, + tool.hints, + tool.requirements, + ArvRuntimeContext(), + tool.metadata) + # Now update job_order with secondaryFiles + discover_secondary_files(arvrunner.fs_access, + builder, + tool.tool["inputs"], + job_order) jobmapper = upload_dependencies(arvrunner, name, @@ -443,7 +618,8 @@ class Runner(Process): """Base class for runner processes, which submit an instance of arvados-cwl-runner and wait for the final result.""" - def __init__(self, runner, tool, loadingContext, enable_reuse, + def __init__(self, runner, updated_tool, + tool, loadingContext, enable_reuse, output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, intermediate_output_ttl=0, merged_map=None, @@ -451,7 +627,10 @@ class Runner(Process): collection_cache_size=256, collection_cache_is_default=True): - super(Runner, self).__init__(tool.tool, loadingContext) + loadingContext = loadingContext.copy() + loadingContext.metadata = updated_tool.metadata.copy() + + super(Runner, self).__init__(updated_tool.tool, loadingContext) self.arvrunner = runner self.embedded_tool = tool @@ -474,6 +653,7 @@ class Runner(Process): self.intermediate_output_ttl = intermediate_output_ttl self.priority = priority self.secret_store = secret_store + self.enable_dev = loadingContext.enable_dev self.submit_runner_cores = 1 self.submit_runner_ram = 1024 # defaut 1 GiB