Merge branch '18947-githttpd'
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index ea4ecf2b6d78cdefababfa94b923a5f4183338d5..7d4310b0e0ce94b9430ded7f60ca04416e2964b9 100644 (file)
@@ -31,8 +31,7 @@ import cwltool.workflow
 from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
@@ -41,7 +40,7 @@ import schema_salad.validate as validate
 
 import arvados.collection
 from .util import collectionUUID
-import ruamel.yaml as yaml
+from ruamel.yaml import YAML
 from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
@@ -83,7 +82,7 @@ def find_defaults(d, op):
             for i in viewvalues(d):
                 find_defaults(i, op)
 
-def make_builder(joborder, hints, requirements, runtimeContext):
+def make_builder(joborder, hints, requirements, runtimeContext, metadata):
     return Builder(
                  job=joborder,
                  files=[],               # type: List[Dict[Text, Text]]
@@ -106,6 +105,8 @@ def make_builder(joborder, hints, requirements, runtimeContext):
                  outdir="",              # type: Text
                  tmpdir="",              # type: Text
                  stagedir="",            # type: Text
+                 cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
+                 container_engine="docker"
                 )
 
 def search_schemadef(name, reqs):
@@ -169,21 +170,63 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         #
         # Found a file, check for secondaryFiles
         #
-        primary["secondaryFiles"] = []
+        specs = []
+        primary["secondaryFiles"] = secondaryspec
         for i, sf in enumerate(aslist(secondaryspec)):
-            pattern = builder.do_eval(sf["pattern"], context=primary)
+            if builder.cwlVersion == "v1.0":
+                pattern = builder.do_eval(sf, context=primary)
+            else:
+                pattern = builder.do_eval(sf["pattern"], context=primary)
             if pattern is None:
                 continue
-            sfpath = substitute(primary["location"], pattern)
-            required = builder.do_eval(sf.get("required"), context=primary)
+            if isinstance(pattern, list):
+                specs.extend(pattern)
+            elif isinstance(pattern, dict):
+                specs.append(pattern)
+            elif isinstance(pattern, str):
+                if builder.cwlVersion == "v1.0":
+                    specs.append({"pattern": pattern, "required": True})
+                else:
+                    specs.append({"pattern": pattern, "required": sf.get("required")})
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+        found = []
+        for i, sf in enumerate(specs):
+            if isinstance(sf, dict):
+                if sf.get("class") == "File":
+                    pattern = None
+                    if sf.get("location") is None:
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "File object is missing 'location': %s" % sf)
+                    sfpath = sf["location"]
+                    required = True
+                else:
+                    pattern = sf["pattern"]
+                    required = sf.get("required")
+            elif isinstance(sf, str):
+                pattern = sf
+                required = True
+            else:
+                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                    "Expression must return list, object, string or null")
+
+            if pattern is not None:
+                sfpath = substitute(primary["location"], pattern)
+
+            required = builder.do_eval(required, context=primary)
 
             if fsaccess.exists(sfpath):
-                primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+                if pattern is not None:
+                    found.append({"location": sfpath, "class": "File"})
+                else:
+                    found.append(sf)
             elif required:
                 raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
                     "Required secondary file '%s' does not exist" % sfpath)
 
-        primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+        primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
             discovered[primary["location"]] = primary["secondaryFiles"]
     elif inputschema["type"] not in primitive_types_set:
@@ -222,7 +265,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                 textIO = StringIO(text.decode('utf-8'))
             else:
                 textIO = StringIO(text)
-            return yaml.safe_load(textIO)
+            yamlloader = YAML(typ='safe', pure=True)
+            return yamlloader.load(textIO)
         else:
             return {}
 
@@ -237,10 +281,21 @@ def upload_dependencies(arvrunner, name, document_loader,
         # that external references in $include and $mixin are captured.
         scanobj = loadref("", workflowobj["id"])
 
+    metadata = scanobj
+
     sc_result = scandeps(uri, scanobj,
-                  loadref_fields,
-                  set(("$include", "$schemas", "location")),
-                  loadref, urljoin=document_loader.fetcher.urljoin)
+                         loadref_fields,
+                         set(("$include", "location")),
+                         loadref, urljoin=document_loader.fetcher.urljoin,
+                         nestdirs=False)
+
+    optional_deps = scandeps(uri, scanobj,
+                                  loadref_fields,
+                                  set(("$schemas",)),
+                                  loadref, urljoin=document_loader.fetcher.urljoin,
+                                  nestdirs=False)
+
+    sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -298,24 +353,14 @@ def upload_dependencies(arvrunner, name, document_loader,
     if include_primary and "id" in workflowobj:
         sc.append({"class": "File", "location": workflowobj["id"]})
 
-    if "$schemas" in workflowobj:
-        for s in workflowobj["$schemas"]:
-            sc.append({"class": "File", "location": s})
-
     def visit_default(obj):
-        remove = [False]
-        def ensure_default_location(f):
+        def defaults_are_optional(f):
             if "location" not in f and "path" in f:
                 f["location"] = f["path"]
                 del f["path"]
-            if "location" in f and not arvrunner.fs_access.exists(f["location"]):
-                # Doesn't exist, remove from list of dependencies to upload
-                sc[:] = [x for x in sc if x["location"] != f["location"]]
-                # Delete "default" from workflowobj
-                remove[0] = True
-        visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
-        if remove[0]:
-            del obj["default"]
+            normalizeFilesDirs(f)
+            optional_deps.append(f)
+        visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
 
     find_defaults(workflowobj, visit_default)
 
@@ -328,7 +373,8 @@ def upload_dependencies(arvrunner, name, document_loader,
         builder = make_builder(builder_job_order,
                                obj.get("hints", []),
                                obj.get("requirements", []),
-                               ArvRuntimeContext())
+                               ArvRuntimeContext(),
+                               metadata)
         discover_secondary_files(arvrunner.fs_access,
                                  builder,
                                  obj["inputs"],
@@ -350,7 +396,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                            "keep:%s",
                            "keep:%s/%s",
                            name=name,
-                           single_collection=True)
+                           single_collection=True,
+                           optional_deps=optional_deps)
 
     def setloc(p):
         loc = p.get("location")
@@ -395,7 +442,8 @@ def upload_dependencies(arvrunner, name, document_loader,
     if "$schemas" in workflowobj:
         sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
-            sch.append(mapper.mapper(s).resolved)
+            if s in mapper:
+                sch.append(mapper.mapper(s).resolved)
         workflowobj["$schemas"] = sch
 
     return mapper
@@ -411,9 +459,16 @@ def upload_docker(arvrunner, tool):
                 # TODO: can be supported by containers API, but not jobs API.
                 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
                     "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
+                                                       arvrunner.runtimeContext.force_docker_pull,
+                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                       arvrunner.runtimeContext.match_local_docker)
         else:
-            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+            arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
+                                                       True, arvrunner.project_uuid,
+                                                       arvrunner.runtimeContext.force_docker_pull,
+                                                       arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                       arvrunner.runtimeContext.match_local_docker)
     elif isinstance(tool, cwltool.workflow.Workflow):
         for s in tool.steps:
             upload_docker(arvrunner, s.embedded_tool)
@@ -433,19 +488,25 @@ def packed_workflow(arvrunner, tool, merged_map):
 
     def visit(v, cur_id):
         if isinstance(v, dict):
-            if v.get("class") in ("CommandLineTool", "Workflow"):
-                if "id" not in v:
-                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
-                cur_id = rewrite_to_orig.get(v["id"], v["id"])
+            if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
+                if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
+                    raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
+                if "id" in v:
+                    cur_id = rewrite_to_orig.get(v["id"], v["id"])
             if "path" in v and "location" not in v:
                 v["location"] = v["path"]
                 del v["path"]
-            if "location" in v and not v["location"].startswith("keep:"):
-                v["location"] = merged_map[cur_id].resolved[v["location"]]
-            if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
-                v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+            if "location" in v and cur_id in merged_map:
+                if v["location"] in merged_map[cur_id].resolved:
+                    v["location"] = merged_map[cur_id].resolved[v["location"]]
+                if v["location"] in merged_map[cur_id].secondaryFiles:
+                    v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
             if v.get("class") == "DockerRequirement":
-                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
+                v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+                                                                                                             arvrunner.project_uuid,
+                                                                                                             arvrunner.runtimeContext.force_docker_pull,
+                                                                                                             arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                                                                             arvrunner.runtimeContext.match_local_docker)
             for l in v:
                 visit(v[l], cur_id)
         if isinstance(v, list):
@@ -489,7 +550,8 @@ def upload_job_order(arvrunner, name, tool, job_order):
     builder = make_builder(builder_job_order,
                            tool.hints,
                            tool.requirements,
-                           ArvRuntimeContext())
+                           ArvRuntimeContext(),
+                           tool.metadata)
     # Now update job_order with secondaryFiles
     discover_secondary_files(arvrunner.fs_access,
                              builder,
@@ -549,7 +611,10 @@ def arvados_jobs_image(arvrunner, img):
     """Determine if the right arvados/jobs image version is available.  If not, try to pull and upload it."""
 
     try:
-        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+        return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
+                                                          arvrunner.runtimeContext.force_docker_pull,
+                                                          arvrunner.runtimeContext.tmp_outdir_prefix,
+                                                          arvrunner.runtimeContext.match_local_docker)
     except Exception as e:
         raise Exception("Docker image %s is not available\n%s" % (img, e) )