19464: Record & report provenance information from git
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 2582c0a3a3e3fd88c464b58a23c5d74b0281b92e..1544d05cd70660c6e046ef80073b7c80fb7c52c2 100644 (file)
@@ -17,7 +17,30 @@ import json
 import copy
 from collections import namedtuple
 from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    Iterator,
+    List,
+    Mapping,
+    MutableMapping,
+    Sequence,
+    MutableSequence,
+    Optional,
+    Set,
+    Sized,
+    Tuple,
+    Type,
+    Union,
+    cast,
+)
+from cwltool.utils import (
+    CWLObjectType,
+    CWLOutputAtomType,
+    CWLOutputType,
+)
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -230,23 +253,33 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
                 if sfname is None:
                     continue
 
-                p_location = primary["location"]
-                if "/" in p_location:
-                    sfpath = (
-                        p_location[0 : p_location.rindex("/") + 1]
-                        + sfname
-                    )
+                if isinstance(sfname, str):
+                    p_location = primary["location"]
+                    if "/" in p_location:
+                        sfpath = (
+                            p_location[0 : p_location.rindex("/") + 1]
+                            + sfname
+                        )
 
             required = builder.do_eval(required, context=primary)
 
-            if fsaccess.exists(sfpath):
-                if pattern is not None:
-                    found.append({"location": sfpath, "class": "File"})
-                else:
-                    found.append(sf)
-            elif required:
-                raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
-                    "Required secondary file '%s' does not exist" % sfpath)
+            if isinstance(sfname, list) or isinstance(sfname, dict):
+                each = aslist(sfname)
+                for e in each:
+                    if required and not fsaccess.exists(e.get("location")):
+                        raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                            "Required secondary file '%s' does not exist" % e.get("location"))
+                found.extend(each)
+
+            if isinstance(sfname, str):
+                if fsaccess.exists(sfpath):
+                    if pattern is not None:
+                        found.append({"location": sfpath, "class": "File"})
+                    else:
+                        found.append(sf)
+                elif required:
+                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                        "Required secondary file '%s' does not exist" % sfpath)
 
         primary["secondaryFiles"] = cmap(found)
         if discovered is not None:
@@ -305,9 +338,12 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     scanobj = workflowobj
     if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
-        # Need raw file content (before preprocessing) to ensure
-        # that external references in $include and $mixin are captured.
-        scanobj = loadref("", workflowobj["id"])
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if cache is not None and defrg not in cache:
+            # if we haven't seen this file before, want raw file
+            # content (before preprocessing) to ensure that external
+            # references like $include haven't already been inlined.
+            scanobj = loadref("", workflowobj["id"])
 
     metadata = scanobj
 
@@ -325,7 +361,14 @@ def upload_dependencies(arvrunner, name, document_loader,
                                       loadref, urljoin=document_loader.fetcher.urljoin,
                                       nestdirs=False)
 
-    sc_result.extend(optional_deps)
+    if sc_result is None:
+        sc_result = []
+
+    if optional_deps is None:
+        optional_deps = []
+
+    if optional_deps:
+        sc_result.extend(optional_deps)
 
     sc = []
     uuids = {}
@@ -384,8 +427,14 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     normalizeFilesDirs(sc)
 
-    if include_primary and "id" in workflowobj:
-        sc.append({"class": "File", "location": workflowobj["id"]})
+    if "id" in workflowobj:
+        defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+        if include_primary:
+            # make sure it's included
+            sc.append({"class": "File", "location": defrg})
+        else:
+            # make sure it's excluded
+            sc = [d for d in sc if d.get("location") != defrg]
 
     def visit_default(obj):
         def defaults_are_optional(f):
@@ -555,7 +604,7 @@ def upload_docker(arvrunner, tool, runtimeContext):
             upload_docker(arvrunner, s.embedded_tool, runtimeContext)
 
 
-def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
     """Create a packed workflow.
 
     A "packed" workflow is one where all the components have been combined into a single document."""
@@ -595,6 +644,11 @@ def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
             for l in v:
                 visit(l, cur_id)
     visit(packed, None)
+
+    if git_info:
+        for g in git_info:
+            packed[g] = git_info[g]
+
     return packed
 
 
@@ -745,7 +799,8 @@ class Runner(Process):
                  intermediate_output_ttl=0, merged_map=None,
                  priority=None, secret_store=None,
                  collection_cache_size=256,
-                 collection_cache_is_default=True):
+                 collection_cache_is_default=True,
+                 git_info=None):
 
         loadingContext = loadingContext.copy()
         loadingContext.metadata = updated_tool.metadata.copy()
@@ -774,6 +829,7 @@ class Runner(Process):
         self.priority = priority
         self.secret_store = secret_store
         self.enable_dev = loadingContext.enable_dev
+        self.git_info = git_info
 
         self.submit_runner_cores = 1
         self.submit_runner_ram = 1024  # defaut 1 GiB