15028: Handling secondaryFiles on record fields
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 2 May 2019 21:20:10 +0000 (17:20 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 3 May 2019 15:44:37 +0000 (11:44 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py

index 4a8a22a4a99c23ca8f7b0df04ceba5b0a67d5890..752cc32ba69cfdf2dc880f7de9338251e4d1d3b1 100644 (file)
@@ -5,6 +5,7 @@
 from future import standard_library
 standard_library.install_aliases()
 from future.utils import  viewvalues, viewitems
+from past.builtins import basestring
 
 import os
 import sys
@@ -106,36 +107,91 @@ def make_builder(joborder, hints, requirements, runtimeContext):
                  stagedir="",            # type: Text
                 )
 
+def search_schemadef(name, reqs):
+    for r in reqs:
+        if r["class"] == "SchemaDefRequirement":
+            for sd in r["types"]:
+                if sd["name"] == name:
+                    return sd
+    return None
+
+primitive_types_set = frozenset(("null", "boolean", "int", "long",
+                                 "float", "double", "string", "record",
+                                 "array", "enum"))
+
+def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
+    if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
+        # union type, collect all possible secondaryFiles
+        for i in inputschema:
+            set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
+        return
+
+    if isinstance(inputschema, basestring):
+        sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
+        if sd:
+            inputschema = sd
+        else:
+            return
 
-def set_secondary(fsaccess, builder, inputschema, primary, discovered):
-    if isinstance(primary, Mapping) and primary.get("class") == "File":
-        if "secondaryFiles" not in primary:
-            primary["secondaryFiles"] = []
-            for i, sf in enumerate(inputschema["secondaryFiles"]):
-                pattern = builder.do_eval(sf["pattern"], context=primary)
-                if pattern is None:
-                    continue
-                sfpath = substitute(primary["location"], pattern)
-                required = builder.do_eval(sf["required"], context=primary)
-
-                if fsaccess.exists(sfpath):
-                    primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
-                elif required:
-                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
-                        "Required secondary file '%s' does not exist" % sfpath)
-
-            primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
-            if discovered is not None:
-                discovered[primary["location"]] = primary["secondaryFiles"]
-    elif isinstance(primary, Sequence):
-        for e in primary:
-            set_secondary(fsaccess, builder, inputschema, e, discovered)
+    if "secondaryFiles" in inputschema:
+        # set secondaryFiles, may be inherited by compound types.
+        secondaryspec = inputschema["secondaryFiles"]
+
+    if isinstance(inputschema["type"], Mapping):
+        # compound type (array or record)
+        set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+    elif (inputschema["type"] == "record" and
+          isinstance(primary, Mapping)):
+        #
+        # record type, find secondary files associated with fields.
+        #
+        for f in inputschema["fields"]:
+            p = primary.get(shortname(f["name"]))
+            if p:
+                set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
+
+    elif (inputschema["type"] == "array" and
+          isinstance(primary, Sequence)):
+        #
+        # array type, find secondary files of elements
+        #
+        for p in primary:
+            set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
+
+    elif (inputschema["type"] == "File" and
+          secondaryspec and
+          isinstance(primary, Mapping) and
+          primary.get("class") == "File" and
+          "secondaryFiles" not in primary):
+        #
+        # Found a file, check for secondaryFiles
+        #
+        primary["secondaryFiles"] = []
+        for i, sf in enumerate(aslist(secondaryspec)):
+            pattern = builder.do_eval(sf["pattern"], context=primary)
+            if pattern is None:
+                continue
+            sfpath = substitute(primary["location"], pattern)
+            required = builder.do_eval(sf["required"], context=primary)
+
+            if fsaccess.exists(sfpath):
+                primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+            elif required:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Required secondary file '%s' does not exist" % sfpath)
+
+        primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+        if discovered is not None:
+            discovered[primary["location"]] = primary["secondaryFiles"]
+    elif inputschema["type"] not in primitive_types_set:
+        set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
 
 def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
     for inputschema in inputs:
         primary = job_order.get(shortname(inputschema["id"]))
-        if isinstance(primary, (Mapping, Sequence)) and inputschema.get("secondaryFiles"):
-            set_secondary(fsaccess, builder, inputschema, primary, discovered)
+        if isinstance(primary, (Mapping, Sequence)):
+            set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
@@ -418,8 +474,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
                      arvrunner.fs_access)
     # Need to create a builder object to evaluate expressions.
     builder = make_builder(builder_job_order,
-                           tool.tool.get("hints", []),
-                           tool.tool.get("requirements", []),
+                           tool.hints,
+                           tool.requirements,
                            ArvRuntimeContext())
     # Now update job_order with secondaryFiles
     discover_secondary_files(arvrunner.fs_access,
index c9776eac1439f656615b99902cfe03940e7fcd28..3af3dfb4daabb1ae8a0e1303c059ca86c3b1ee99 100644 (file)
@@ -33,7 +33,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20190426195829',
+          'cwltool==1.0.20190502174223',
           'schema-salad==4.1.20190305210046',
           'typing >= 3.6.4',
           'ruamel.yaml >=0.15.54, <=0.15.77',