16482: bump a-c-r's cwltool dependency to version 3.0.20200530110633 so
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 68b2fd827a6e62cc71fb1d30f96ede26f8db3eaf..7bb66a158e50646f1fc984df7ea30ccd31528bf3 100644 (file)
@@ -42,9 +42,10 @@ import schema_salad.validate as validate
 import arvados.collection
 from .util import collectionUUID
 import ruamel.yaml as yaml
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
 
 import arvados_cwl.arvdocker
-from .pathmapper import ArvPathMapper, trim_listing
+from .pathmapper import ArvPathMapper, trim_listing, collection_pdh_pattern, collection_uuid_pattern
 from ._version import __version__
 from . import done
 from . context import ArvRuntimeContext
@@ -137,7 +138,8 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
         # set secondaryFiles, may be inherited by compound types.
         secondaryspec = inputschema["secondaryFiles"]
 
-    if isinstance(inputschema["type"], (Mapping, Sequence)):
+    if (isinstance(inputschema["type"], (Mapping, Sequence)) and
+        not isinstance(inputschema["type"], basestring)):
         # compound type (union, array, record)
         set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
 
@@ -173,7 +175,7 @@ def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discov
             if pattern is None:
                 continue
             sfpath = substitute(primary["location"], pattern)
-            required = builder.do_eval(sf["required"], context=primary)
+            required = builder.do_eval(sf.get("required"), context=primary)
 
             if fsaccess.exists(sfpath):
                 primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
@@ -193,9 +195,6 @@ def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=No
         if isinstance(primary, (Mapping, Sequence)):
             set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
 
-collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
-collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
-
 def upload_dependencies(arvrunner, name, document_loader,
                         workflowobj, uri, loadref_run,
                         include_primary=True, discovered_secondaryfiles=None):
@@ -336,7 +335,8 @@ def upload_dependencies(arvrunner, name, document_loader,
                                  builder_job_order,
                                  discovered)
 
-    visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
+    copied, _ = document_loader.resolve_all(copy.deepcopy(cmap(workflowobj)), base_url=uri, checklinks=False)
+    visit_class(copied, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
 
     for d in list(discovered):
         # Only interested in discovered secondaryFiles which are local
@@ -393,7 +393,7 @@ def upload_dependencies(arvrunner, name, document_loader,
             discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
 
     if "$schemas" in workflowobj:
-        sch = []
+        sch = CommentedSeq()
         for s in workflowobj["$schemas"]:
             sch.append(mapper.mapper(s).resolved)
         workflowobj["$schemas"] = sch
@@ -425,8 +425,9 @@ def packed_workflow(arvrunner, tool, merged_map):
     A "packed" workflow is one where all the components have been combined into a single document."""
 
     rewrites = {}
-    packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
-                  tool.tool["id"], tool.metadata, rewrite_out=rewrites)
+    packed = pack(arvrunner.loadingContext, tool.tool["id"],
+                  rewrite_out=rewrites,
+                  loader=tool.doc_loader)
 
     rewrite_to_orig = {v: k for k,v in viewitems(rewrites)}
 
@@ -469,7 +470,16 @@ def upload_job_order(arvrunner, name, tool, job_order):
 
     # Make a copy of the job order and set defaults.
     builder_job_order = copy.copy(job_order)
-    fill_in_defaults(tool.tool["inputs"],
+
+    # fill_in_defaults throws an error if there are any
+    # missing required parameters, we don't want it to do that
+    # so make them all optional.
+    inputs_copy = copy.deepcopy(tool.tool["inputs"])
+    for i in inputs_copy:
+        if "null" not in i["type"]:
+            i["type"] = ["null"] + aslist(i["type"])
+
+    fill_in_defaults(inputs_copy,
                      builder_job_order,
                      arvrunner.fs_access)
     # Need to create a builder object to evaluate expressions.
@@ -570,7 +580,8 @@ class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, tool, loadingContext, enable_reuse,
+    def __init__(self, runner, updated_tool,
+                 tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
@@ -579,10 +590,9 @@ class Runner(Process):
                  collection_cache_is_default=True):
 
         loadingContext = loadingContext.copy()
-        loadingContext.metadata = loadingContext.metadata.copy()
-        loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
+        loadingContext.metadata = updated_tool.metadata.copy()
 
-        super(Runner, self).__init__(tool.tool, loadingContext)
+        super(Runner, self).__init__(updated_tool.tool, loadingContext)
 
         self.arvrunner = runner
         self.embedded_tool = tool