16535: Merge branch 'master'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 2239e0f9df952b9ab75a7e9a96ab46953ab29f94..b10f02d1401b9e31014eb30b32e18adfdcb394d2 100644 (file)
@@ -42,6 +42,7 @@ import schema_salad.validate as validate
 import arvados.collection
 from .util import collectionUUID
 import ruamel.yaml as yaml
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
 from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
@@ -82,7 +83,7 @@ def find_defaults(d, op):
             for i in viewvalues(d):
                 find_defaults(i, op)
 
-def make_builder(joborder, hints, requirements, runtimeContext):
+def make_builder(joborder, hints, requirements, runtimeContext, metadata):
     return Builder(
                  job=joborder,
                  files=[],               # type: List[Dict[Text, Text]]
@@ -105,6 +106,7 @@ def make_builder(joborder, hints, requirements, runtimeContext):
                  outdir="",              # type: Text
                  tmpdir="",              # type: Text
                  stagedir="",            # type: Text
+                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion")
                 )
 
 def search_schemadef(name, reqs):
@@ -168,21 +170,50 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         #
         # Found a file, check for secondaryFiles
         #
-        primary["secondaryFiles"] = []
+        specs = []
+        primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
-            pattern = builder.do_eval(sf["pattern"], context=primary)
+            if builder.cwlVersion == "v1.0":
+                pattern = builder.do_eval(sf, context=primary)
+            else:
+                pattern = builder.do_eval(sf["pattern"], context=primary)
             if pattern is None:
                 continue
+            if isinstance(pattern, list):
+                specs.extend(pattern)
+            elif isinstance(pattern, dict):
+                specs.append(pattern)
+            elif isinstance(pattern, str):
+                specs.append({"pattern": pattern})
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+        found = []
+        for i, sf in enumerate(specs):
+            if isinstance(sf, dict):
+                if sf.get("class") == "File":
+                    pattern = sf["basename"]
+                else:
+                    pattern = sf["pattern"]
+                    required = sf.get("required")
+            elif isinstance(sf, str):
+                pattern = sf
+                required = True
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
             sfpath = substitute(primary["location"], pattern)
-            required = builder.do_eval(sf.get("required"), context=primary)
+            required = builder.do_eval(required, context=primary)
 
             if fsaccess.exists(sfpath):
-                primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+                found.append({"location": sfpath, "class": "File"})
             elif required:
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Required secondary file '%s' does not exist" % sfpath)
 
-        primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+        primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
     elif inputschema["type"] not in primitive_types_set:
@@ -236,6 +267,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
+    metadata = scanobj
+
     sc_result = scandeps(uri, scanobj,
                   loadref_fields,
                   set(("$include", "$schemas", "location")),
@@ -327,7 +360,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         builder = make_builder(builder_job_order,
                                obj.get("hints", []),
                                obj.get("requirements", []),
-                               ArvRuntimeContext())
+                               ArvRuntimeContext(),
+                               metadata)
         discover_secondary_files(arvrunner.fs_access,
                                  builder,
                                  obj["inputs"],
@@ -392,7 +426,7 @@ def upload_dependencies(arvrunner, name, document_loader,
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
     if "$schemas" in workflowobj:
-        sch = []
+        sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
             sch.append(mapper.mapper(s).resolved)
         workflowobj["$schemas"] = sch
@@ -424,17 +458,22 @@ def packed_workflow(arvrunner, tool, merged_map):
     A "packed" workflow is one where all the components have been combined into a single document."""
 
     rewrites = {}
-    packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
-                  tool.tool["id"], tool.metadata, rewrite_out=rewrites)
+    packed = pack(arvrunner.loadingContext, tool.tool["id"],
+                  rewrite_out=rewrites,
+                  loader=tool.doc_loader)
 
     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
 
     def visit(v, cur_id):
         if isinstance(v, dict):
             if v.get("class") in ("CommandLineTool", "Workflow"):
-                if "id" not in v:
-                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
-                cur_id = rewrite_to_orig.get(v["id"], v["id"])
+                if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
+                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
+                if "id" in v:
+                    cur_id = rewrite_to_orig.get(v["id"], v["id"])
+            if "path" in v and "location" not in v:
+                v["location"] = v["path"]
+                del v["path"]
             if "location" in v and not v["location"].startswith("keep:"):
                 v["location"] = merged_map[cur_id].resolved[v["location"]]
             if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
@@ -484,7 +523,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
     builder = make_builder(builder_job_order,
                            tool.hints,
                            tool.requirements,
-                           ArvRuntimeContext())
+                           ArvRuntimeContext(),
+                           tool.metadata)
     # Now update job_order with secondaryFiles
     discover_secondary_files(arvrunner.fs_access,
                              builder,