import arvados.collection
from .util import collectionUUID
import ruamel.yaml as yaml
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
# set secondaryFiles, may be inherited by compound types.
secondaryspec = inputschema["secondaryFiles"]
- if isinstance(inputschema["type"], (Mapping, Sequence)):
+ if (isinstance(inputschema["type"], (Mapping, Sequence)) and
+ not isinstance(inputschema["type"], basestring)):
# compound type (union, array, record)
set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
#
# Found a file, check for secondaryFiles
#
- primary["secondaryFiles"] = []
+ specs = []
+ primary["secondaryFiles"] = secondaryspec
for i, sf in enumerate(aslist(secondaryspec)):
pattern = builder.do_eval(sf["pattern"], context=primary)
if pattern is None:
continue
+ if isinstance(pattern, list):
+ specs.extend(pattern)
+ elif isinstance(pattern, dict):
+ specs.append(pattern)
+ elif isinstance(pattern, str):
+ specs.append({"pattern": pattern})
+ else:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Expression must return list, object, string or null")
+
+ found = []
+ for i, sf in enumerate(specs):
+ if isinstance(sf, dict):
+ if sf.get("class") == "File":
+ pattern = sf["basename"]
+ else:
+ pattern = sf["pattern"]
+ required = sf.get("required")
+ elif isinstance(sf, str):
+ pattern = sf
+ required = True
+ else:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Expression must return list, object, string or null")
+
sfpath = substitute(primary["location"], pattern)
- required = builder.do_eval(sf["required"], context=primary)
+ required = builder.do_eval(required, context=primary)
if fsaccess.exists(sfpath):
- primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+ found.append({"location": sfpath, "class": "File"})
elif required:
raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
"Required secondary file '%s' does not exist" % sfpath)
- primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+ primary["secondaryFiles"] = cmap(found)
if discovered is not None:
discovered[primary["location"]] = primary["secondaryFiles"]
elif inputschema["type"] not in primitive_types_set:
if isinstance(primary, (Mapping, Sequence)):
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
-collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
-collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
-
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run,
include_primary=True, discovered_secondaryfiles=None):
builder_job_order,
discovered)
- visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
+ copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
+ visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
for d in list(discovered):
# Only interested in discovered secondaryFiles which are local
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
if "$schemas" in workflowobj:
- sch = []
+ sch = CommentedSeq()
for s in workflowobj["$schemas"]:
sch.append(mapper.mapper(s).resolved)
workflowobj["$schemas"] = sch
A "packed" workflow is one where all the components have been combined into a single document."""
rewrites = {}
- packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
- tool.tool["id"], tool.metadata, rewrite_out=rewrites)
+ packed = pack(arvrunner.loadingContext, tool.tool["id"],
+ rewrite_out=rewrites,
+ loader=tool.doc_loader)
rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
def visit(v, cur_id):
if isinstance(v, dict):
if v.get("class") in ("CommandLineTool", "Workflow"):
- if "id" not in v:
- raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
- cur_id = rewrite_to_orig.get(v["id"], v["id"])
+ if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
+ raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
+ if "id" in v:
+ cur_id = rewrite_to_orig.get(v["id"], v["id"])
+ if "path" in v and "location" not in v:
+ v["location"] = v["path"]
+ del v["path"]
if "location" in v and not v["location"].startswith("keep:"):
v["location"] = merged_map[cur_id].resolved[v["location"]]
if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
# Make a copy of the job order and set defaults.
builder_job_order = copy.copy(job_order)
- fill_in_defaults(tool.tool["inputs"],
+
+ # fill_in_defaults throws an error if there are any
+ # missing required parameters, we don't want it to do that
+ # so make them all optional.
+ inputs_copy = copy.deepcopy(tool.tool["inputs"])
+ for i in inputs_copy:
+ if "null" not in i["type"]:
+ i["type"] = ["null"] + aslist(i["type"])
+
+ fill_in_defaults(inputs_copy,
builder_job_order,
arvrunner.fs_access)
# Need to create a builder object to evaluate expressions.
"""Base class for runner processes, which submit an instance of
arvados-cwl-runner and wait for the final result."""
- def __init__(self, runner, tool, loadingContext, enable_reuse,
+ def __init__(self, runner, updated_tool,
+ tool, loadingContext, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
intermediate_output_ttl=0, merged_map=None,
collection_cache_is_default=True):
loadingContext = loadingContext.copy()
- loadingContext.metadata = loadingContext.metadata.copy()
- loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
+ loadingContext.metadata = updated_tool.metadata.copy()
- super(Runner, self).__init__(tool.tool, loadingContext)
+ super(Runner, self).__init__(updated_tool.tool, loadingContext)
self.arvrunner = runner
self.embedded_tool = tool