Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index e81f621750c8e2f8c7c310918b8f0725a68df9db..4861039198a18c36dbd0ae6d805be060cff1e224 100644 (file)
@@ -51,15 +51,16 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import (UnsupportedRequirement, normalizeFilesDirs,
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
                              shortname, Process, fill_in_defaults)
-from cwltool.load_tool import fetch_document
+from cwltool.load_tool import fetch_document, jobloaderctx
 from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
 from cwltool.builder import Builder
 import schema_salad.validate as validate
+import schema_salad.ref_resolver
 
 import arvados.collection
 import arvados.util
@@ -253,23 +254,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                 if sfname is None:
                     continue
 
-                p_location = primary["location"]
-                if "/" in p_location:
-                    sfpath = (
-                        p_location[0 : p_location.rindex("/") + 1]
-                        + sfname
-                    )
+                if isinstance(sfname, str):
+                    p_location = primary["location"]
+                    if "/" in p_location:
+                        sfpath = (
+                            p_location[0 : p_location.rindex("/") + 1]
+                            + sfname
+                        )
 
             required = builder.do_eval(required, context=primary)
 
-            if fsaccess.exists(sfpath):
-                if pattern is not None:
-                    found.append({"location": sfpath, "class": "File"})
-                else:
-                    found.append(sf)
-            elif required:
-                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
-                    "Required secondary file '%s' does not exist" % sfpath)
+            if isinstance(sfname, list) or isinstance(sfname, dict):
+                each = aslist(sfname)
+                for e in each:
+                    if required and not fsaccess.exists(e.get("location")):
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "Required secondary file '%s' does not exist" % e.get("location"))
+                found.extend(each)
+
+            if isinstance(sfname, str):
+                if fsaccess.exists(sfpath):
+                    if pattern is not None:
+                        found.append({"location": sfpath, "class": "File"})
+                    else:
+                        found.append(sf)
+                elif required:
+                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                        "Required secondary file '%s' does not exist" % sfpath)
 
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
@@ -328,9 +339,12 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     scanobj = workflowobj
     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        # Need raw file content (before preprocessing) to ensure
-        # that external references in $include and $mixin are captured.
-        scanobj = loadref("", workflowobj["id"])
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if cache is not None and defrg not in cache:
+            # if we haven't seen this file before, want raw file
+            # content (before preprocessing) to ensure that external
+            # references like $include haven't already been inlined.
+            scanobj = loadref("", workflowobj["id"])
 
     metadata = scanobj
 
@@ -414,8 +428,14 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     normalizeFilesDirs(sc)
 
-    if include_primary and "id" in workflowobj:
-        sc.append({"class": "File", "location": workflowobj["id"]})
+    if "id" in workflowobj:
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if include_primary:
+            # make sure it's included
+            sc.append({"class": "File", "location": defrg})
+        else:
+            # make sure it's excluded
+            sc = [d for d in sc if d.get("location") != defrg]
 
     def visit_default(obj):
         def defaults_are_optional(f):
@@ -585,7 +605,7 @@ def upload_docker(arvrunner, tool, runtimeContext):
             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
-def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -625,6 +645,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
             for l in v:
                 visit(l, cur_id)
     visit(packed, None)
+
+    if git_info:
+        for g in git_info:
+            packed[g] = git_info[g]
+
     return packed
 
 
@@ -670,9 +695,12 @@ def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
                              tool.tool["inputs"],
                              job_order)
 
+    _jobloaderctx = jobloaderctx.copy()
+    jobloader = schema_salad.ref_resolver.Loader(_jobloaderctx, fetcher_constructor=tool.doc_loader.fetcher_constructor)
+
     jobmapper = upload_dependencies(arvrunner,
                                     name,
-                                    tool.doc_loader,
+                                    jobloader,
                                     job_order,
                                     job_order.get("id", "#"),
                                     False,
@@ -700,28 +728,37 @@ def upload_workflow_deps(arvrunner, tool, runtimeContext):
 
     merged_map = {}
     tool_dep_cache = {}
+
+    todo = []
+
+    # Standard traversal is top down, we want to go bottom up, so use
+    # the visitor to accumalate a list of nodes to visit, then
+    # visit them in reverse order.
     def upload_tool_deps(deptool):
         if "id" in deptool:
-            discovered_secondaryfiles = {}
-            with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
-                pm = upload_dependencies(arvrunner,
-                                         "%s dependencies" % (shortname(deptool["id"])),
-                                         document_loader,
-                                         deptool,
-                                         deptool["id"],
-                                         False,
-                                         runtimeContext,
-                                         include_primary=False,
-                                         discovered_secondaryfiles=discovered_secondaryfiles,
-                                         cache=tool_dep_cache)
-            document_loader.idx[deptool["id"]] = deptool
-            toolmap = {}
-            for k,v in pm.items():
-                toolmap[k] = v.resolved
-            merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+            todo.append(deptool)
 
     tool.visit(upload_tool_deps)
 
+    for deptool in reversed(todo):
+        discovered_secondaryfiles = {}
+        with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+            pm = upload_dependencies(arvrunner,
+                                     "%s dependencies" % (shortname(deptool["id"])),
+                                     document_loader,
+                                     deptool,
+                                     deptool["id"],
+                                     False,
+                                     runtimeContext,
+                                     include_primary=False,
+                                     discovered_secondaryfiles=discovered_secondaryfiles,
+                                     cache=tool_dep_cache)
+        document_loader.idx[deptool["id"]] = deptool
+        toolmap = {}
+        for k,v in pm.items():
+            toolmap[k] = v.resolved
+        merged_map[deptool["id"]] = FileUpdates(toolmap, discovered_secondaryfiles)
+
     return merged_map
 
 def arvados_jobs_image(arvrunner, img, runtimeContext):
@@ -775,7 +812,8 @@ class Runner(Process):
                  intermediate_output_ttl=0, merged_map=None,
                  priority=None, secret_store=None,
                  collection_cache_size=256,
-                 collection_cache_is_default=True):
+                 collection_cache_is_default=True,
+                 git_info=None):
 
         loadingContext = loadingContext.copy()
         loadingContext.metadata = updated_tool.metadata.copy()
@@ -804,6 +842,7 @@ class Runner(Process):
         self.priority = priority
         self.secret_store = secret_store
         self.enable_dev = loadingContext.enable_dev
+        self.git_info = git_info
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB
@@ -888,239 +927,3 @@ class Runner(Process):
             self.arvrunner.output_callback({}, "permanentFail")
         else:
             self.arvrunner.output_callback(outputs, processStatus)
-
-
-
-
-# --- from cwltool ---
-
-
-CWL_IANA = "https://www.iana.org/assignments/media-types/application/cwl"
-
-
-def scandeps(
-    base: str,
-    doc: Union[CWLObjectType, MutableSequence[CWLObjectType]],
-    reffields: Set[str],
-    urlfields: Set[str],
-    loadref: Callable[[str, str], Union[CommentedMap, CommentedSeq, str, None]],
-    urljoin: Callable[[str, str], str] = urllib.parse.urljoin,
-    nestdirs: bool = True,
-    do_normalize: bool = True,
-) -> Optional[MutableSequence[CWLObjectType]]:
-
-    """Given a CWL document or input object, search for dependencies
-    (references to external files) of 'doc' and return them as a list
-    of File or Directory objects.
-
-    The 'base' is the base URL for relative references.
-
-    Looks for objects with 'class: File' or 'class: Directory' and
-    adds them to the list of dependencies.
-
-    Anything in 'urlfields' is also added as a File dependency.
-
-    Anything in 'reffields' (such as workflow step 'run') will be
-    added as a dependency and also loaded (using the 'loadref'
-    function) and recursively scanned for dependencies.  Those
-    dependencies will be added as secondary files to the primary file.
-
-    If "nestdirs" is true, create intermediate directory objects when
-    a file is located in a subdirectory under the starting directory.
-    This is so that if the dependencies are materialized, they will
-    produce the same relative file system locations.
-
-    """
-
-    if do_normalize:
-        import pprint
-        pprint.pprint(doc)
-
-    r: Optional[MutableSequence[CWLObjectType]] = None
-    if isinstance(doc, MutableMapping):
-        if "id" in doc:
-            if cast(str, doc["id"]).startswith("file://"):
-                df, _ = urllib.parse.urldefrag(cast(str, doc["id"]))
-                if base != df:
-                    if r is None:
-                        r = []
-                    r.append({"class": "File", "location": df, "format": CWL_IANA})
-                    base = df
-
-        if doc.get("class") in ("File", "Directory") and "location" in urlfields:
-            with Perf(metrics, "File or Directory with location"):
-                u = cast(Optional[str], doc.get("location", doc.get("path")))
-                if u and not u.startswith("_:"):
-                    deps = {
-                        "class": doc["class"],
-                        "location": urljoin(base, u),
-                    }  # type: CWLObjectType
-                    if "basename" in doc:
-                        deps["basename"] = doc["basename"]
-                    if doc["class"] == "Directory" and "listing" in doc:
-                        deps["listing"] = doc["listing"]
-                    if doc["class"] == "File" and "secondaryFiles" in doc:
-                        sd = scandeps(
-                            base,
-                            cast(
-                                Union[CWLObjectType, MutableSequence[CWLObjectType]],
-                                doc["secondaryFiles"],
-                            ),
-                            reffields,
-                            urlfields,
-                            loadref,
-                            urljoin=urljoin,
-                            nestdirs=nestdirs,
-                            do_normalize=False,
-                        )
-                        if sd:
-                            deps["secondaryFiles"] = cast(
-                                CWLOutputAtomType,
-                                sd
-                            )
-                    if nestdirs:
-                        deps = nestdir(base, deps)
-                    if r is None:
-                        r = []
-                    r.append(deps)
-                else:
-                    if doc["class"] == "Directory" and "listing" in doc:
-                        sd = scandeps(
-                                base,
-                                cast(MutableSequence[CWLObjectType], doc["listing"]),
-                                reffields,
-                                urlfields,
-                                loadref,
-                                urljoin=urljoin,
-                                nestdirs=nestdirs,
-                                do_normalize=False,
-                            )
-                        if sd:
-                            if r is None:
-                                r = []
-                            r.extend(sd)
-                    elif doc["class"] == "File" and "secondaryFiles" in doc:
-                        sd = scandeps(
-                                base,
-                                cast(MutableSequence[CWLObjectType], doc["secondaryFiles"]),
-                                reffields,
-                                urlfields,
-                                loadref,
-                                urljoin=urljoin,
-                                nestdirs=nestdirs,
-                                do_normalize=False,
-                            )
-                        if sd:
-                            if r is None:
-                                r = sd
-                            else:
-                                r.extend(sd)
-
-        for k, v in doc.items():
-            if k in reffields:
-                with Perf(metrics, "k in reffields"):
-                    for u2 in aslist(v):
-                        if isinstance(u2, MutableMapping):
-                            sd = scandeps(
-                                    base,
-                                    u2,
-                                    reffields,
-                                    urlfields,
-                                    loadref,
-                                    urljoin=urljoin,
-                                    nestdirs=nestdirs,
-                                    do_normalize=False,
-                                )
-                            if sd:
-                                if r is None:
-                                    r = sd
-                                else:
-                                    r.extend(sd)
-                        else:
-                            subid = urljoin(base, u2)
-                            basedf, _ = urllib.parse.urldefrag(base)
-                            subiddf, _ = urllib.parse.urldefrag(subid)
-                            if basedf == subiddf:
-                                continue
-                            sub = cast(
-                                Union[MutableSequence[CWLObjectType], CWLObjectType],
-                                loadref(base, u2),
-                            )
-                            deps2 = {
-                                "class": "File",
-                                "location": subid,
-                                "format": CWL_IANA,
-                            }  # type: CWLObjectType
-                            sf = scandeps(
-                                subid,
-                                sub,
-                                reffields,
-                                urlfields,
-                                loadref,
-                                urljoin=urljoin,
-                                nestdirs=nestdirs,
-                                do_normalize=False,
-                            )
-                            if sf:
-                                deps2["secondaryFiles"] = cast(
-                                    MutableSequence[CWLOutputAtomType], mergedirs(sf)
-                                )
-                            if nestdirs:
-                                deps2 = nestdir(base, deps2)
-                            if r is None:
-                                r = []
-                            r.append(deps2)
-            elif k in urlfields and k != "location":
-                with Perf(metrics, "k in urlfields"):
-                    for u3 in aslist(v):
-                        deps = {"class": "File", "location": urljoin(base, u3)}
-                        if nestdirs:
-                            deps = nestdir(base, deps)
-                        if r is None:
-                            r = []
-                        r.append(deps)
-            elif doc.get("class") in ("File", "Directory") and k in (
-                "listing",
-                "secondaryFiles",
-            ):
-                # should be handled earlier.
-                pass
-            else:
-                with Perf(metrics, "k is something else"):
-                    sd = scandeps(
-                            base,
-                            cast(Union[MutableSequence[CWLObjectType], CWLObjectType], v),
-                            reffields,
-                            urlfields,
-                            loadref,
-                            urljoin=urljoin,
-                            nestdirs=nestdirs,
-                            do_normalize=False,
-                        )
-                    if sd:
-                        if r is None:
-                            r = sd
-                        else:
-                            r.extend(sd)
-    elif isinstance(doc, MutableSequence):
-        with Perf(metrics, "d in doc"):
-            for d in doc:
-                sd = scandeps(
-                        base,
-                        d,
-                        reffields,
-                        urlfields,
-                        loadref,
-                        urljoin=urljoin,
-                        nestdirs=nestdirs,
-                        do_normalize=False,
-                    )
-                if r is None:
-                    r = sd
-                else:
-                    r.extend(sd)
-
-    if r and do_normalize:
-        normalizeFilesDirs(r)
-
-    return r