from future import standard_library
standard_library.install_aliases()
from future.utils import viewvalues, viewitems
+from past.builtins import basestring
import os
import sys
import ruamel.yaml as yaml
import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
stagedir="", # type: Text
)
+def search_schemadef(name, reqs):
+ for r in reqs:
+ if r["class"] == "SchemaDefRequirement":
+ for sd in r["types"]:
+ if sd["name"] == name:
+ return sd
+ return None
+
+primitive_types_set = frozenset(("null", "boolean", "int", "long",
+ "float", "double", "string", "record",
+ "array", "enum"))
+
+def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
+ if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
+ # union type, collect all possible secondaryFiles
+ for i in inputschema:
+ set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
+ return
+
+ if isinstance(inputschema, basestring):
+ sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
+ if sd:
+ inputschema = sd
+ else:
+ return
-def set_secondary(fsaccess, builder, inputschema, primary, discovered):
- if isinstance(primary, Mapping) and primary.get("class") == "File":
- if "secondaryFiles" not in primary:
- primary["secondaryFiles"] = []
- for i, sf in enumerate(inputschema["secondaryFiles"]):
- pattern = builder.do_eval(sf["pattern"], context=primary)
- if pattern is None:
- continue
- sfpath = substitute(primary["location"], pattern)
- required = builder.do_eval(sf["required"], context=primary)
-
- if fsaccess.exists(sfpath):
- primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
- elif required:
- raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
- "Required secondary file '%s' does not exist" % sfpath)
-
- primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
- if discovered is not None:
- discovered[primary["location"]] = primary["secondaryFiles"]
- elif isinstance(primary, Sequence):
- for e in primary:
- set_secondary(fsaccess, builder, inputschema, e, discovered)
+ if "secondaryFiles" in inputschema:
+ # set secondaryFiles, may be inherited by compound types.
+ secondaryspec = inputschema["secondaryFiles"]
+
+ if (isinstance(inputschema["type"], (Mapping, Sequence)) and
+ not isinstance(inputschema["type"], basestring)):
+ # compound type (union, array, record)
+ set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+ elif (inputschema["type"] == "record" and
+ isinstance(primary, Mapping)):
+ #
+ # record type, find secondary files associated with fields.
+ #
+ for f in inputschema["fields"]:
+ p = primary.get(shortname(f["name"]))
+ if p:
+ set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
+
+ elif (inputschema["type"] == "array" and
+ isinstance(primary, Sequence)):
+ #
+ # array type, find secondary files of elements
+ #
+ for p in primary:
+ set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
+
+ elif (inputschema["type"] == "File" and
+ secondaryspec and
+ isinstance(primary, Mapping) and
+ primary.get("class") == "File" and
+ "secondaryFiles" not in primary):
+ #
+ # Found a file, check for secondaryFiles
+ #
+ primary["secondaryFiles"] = []
+ for i, sf in enumerate(aslist(secondaryspec)):
+ pattern = builder.do_eval(sf["pattern"], context=primary)
+ if pattern is None:
+ continue
+ sfpath = substitute(primary["location"], pattern)
+ required = builder.do_eval(sf.get("required"), context=primary)
+
+ if fsaccess.exists(sfpath):
+ primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
+
+ primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+ if discovered is not None:
+ discovered[primary["location"]] = primary["secondaryFiles"]
+ elif inputschema["type"] not in primitive_types_set:
+ set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
for inputschema in inputs:
primary = job_order.get(shortname(inputschema["id"]))
- if isinstance(primary, (Mapping, Sequence)) and inputschema.get("secondaryFiles"):
- set_secondary(fsaccess, builder, inputschema, primary, discovered)
-
-collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
-collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
+ if isinstance(primary, (Mapping, Sequence)):
+ set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run,
builder_job_order,
discovered)
- visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
+ copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
+ visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
for d in list(discovered):
# Only interested in discovered secondaryFiles which are local
# Make a copy of the job order and set defaults.
builder_job_order = copy.copy(job_order)
- fill_in_defaults(tool.tool["inputs"],
+
+ # fill_in_defaults throws an error if there are any
+ # missing required parameters, we don't want it to do that
+ # so make them all optional.
+ inputs_copy = copy.deepcopy(tool.tool["inputs"])
+ for i in inputs_copy:
+ if "null" not in i["type"]:
+ i["type"] = ["null"] + aslist(i["type"])
+
+ fill_in_defaults(inputs_copy,
builder_job_order,
arvrunner.fs_access)
# Need to create a builder object to evaluate expressions.
builder = make_builder(builder_job_order,
- tool.tool.get("hints", []),
- tool.tool.get("requirements", []),
+ tool.hints,
+ tool.requirements,
ArvRuntimeContext())
# Now update job_order with secondaryFiles
discover_secondary_files(arvrunner.fs_access,