X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/420b31d5f181d320e6f770cc3c63f2ae66908c88..f423aff73c1927a74e39c738e08bd6f1100a94c5:/sdk/cwl/arvados_cwl/runner.py diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 752cc32ba6..7bb66a158e 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -42,9 +42,10 @@ import schema_salad.validate as validate import arvados.collection from .util import collectionUUID import ruamel.yaml as yaml +from ruamel.yaml.comments import CommentedMap, CommentedSeq import arvados_cwl.arvdocker -from .pathmapper import ArvPathMapper, trim_listing +from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern from ._version import __version__ from . import done from . context import ArvRuntimeContext @@ -137,8 +138,9 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov # set secondaryFiles, may be inherited by compound types. secondaryspec = inputschema["secondaryFiles"] - if isinstance(inputschema["type"], Mapping): - # compound type (array or record) + if (isinstance(inputschema["type"], (Mapping, Sequence)) and + not isinstance(inputschema["type"], basestring)): + # compound type (union, array, record) set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered) elif (inputschema["type"] == "record" and @@ -173,7 +175,7 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov if pattern is None: continue sfpath = substitute(primary["location"], pattern) - required = builder.do_eval(sf["required"], context=primary) + required = builder.do_eval(sf.get("required"), context=primary) if fsaccess.exists(sfpath): primary["secondaryFiles"].append({"location": sfpath, "class": "File"}) @@ -193,9 +195,6 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No if isinstance(primary, (Mapping, Sequence)): set_secondary(fsaccess, builder, inputschema, None, primary, discovered) -collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$') -collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?') - def upload_dependencies(arvrunner, name, document_loader, workflowobj, uri, loadref_run, include_primary=True, discovered_secondaryfiles=None): @@ -336,7 +335,8 @@ def upload_dependencies(arvrunner, name, document_loader, builder_job_order, discovered) - visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files) + copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False) + visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files) for d in list(discovered): # Only interested in discovered secondaryFiles which are local @@ -393,7 +393,7 @@ def upload_dependencies(arvrunner, name, document_loader, discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d] if "$schemas" in workflowobj: - sch = [] + sch = CommentedSeq() for s in workflowobj["$schemas"]: sch.append(mapper.mapper(s).resolved) workflowobj["$schemas"] = sch @@ -425,8 +425,9 @@ def packed_workflow(arvrunner, tool, merged_map): A "packed" workflow is one where all the components have been combined into a single document.""" rewrites = {} - packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), - tool.tool["id"], tool.metadata, rewrite_out=rewrites) + packed = pack(arvrunner.loadingContext, tool.tool["id"], + rewrite_out=rewrites, + loader=tool.doc_loader) rewrite_to_orig = {v: k for k,v in viewitems(rewrites)} @@ -469,7 +470,16 @@ def upload_job_order(arvrunner, name, tool, job_order): # Make a copy of the job order and set defaults. builder_job_order = copy.copy(job_order) - fill_in_defaults(tool.tool["inputs"], + + # fill_in_defaults throws an error if there are any + # missing required parameters, we don't want it to do that + # so make them all optional. + inputs_copy = copy.deepcopy(tool.tool["inputs"]) + for i in inputs_copy: + if "null" not in i["type"]: + i["type"] = ["null"] + aslist(i["type"]) + + fill_in_defaults(inputs_copy, builder_job_order, arvrunner.fs_access) # Need to create a builder object to evaluate expressions. @@ -570,7 +580,8 @@ class Runner(Process): """Base class for runner processes, which submit an instance of arvados-cwl-runner and wait for the final result.""" - def __init__(self, runner, tool, loadingContext, enable_reuse, + def __init__(self, runner, updated_tool, + tool, loadingContext, enable_reuse, output_name, output_tags, submit_runner_ram=0, name=None, on_error=None, submit_runner_image=None, intermediate_output_ttl=0, merged_map=None, @@ -579,10 +590,9 @@ class Runner(Process): collection_cache_is_default=True): loadingContext = loadingContext.copy() - loadingContext.metadata = loadingContext.metadata.copy() - loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION + loadingContext.metadata = updated_tool.metadata.copy() - super(Runner, self).__init__(tool.tool, loadingContext) + super(Runner, self).__init__(updated_tool.tool, loadingContext) self.arvrunner = runner self.embedded_tool = tool