10460: Add implied secondaryFiles based on input parameter spec.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 054d3530cfe174b9a5da3025d248097ca1062d7a..fd150d7457ee048ce713c6acda36cf72130368e4 100644 (file)
@@ -9,15 +9,18 @@ from cStringIO import StringIO
 import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.utils import aslist
+from cwltool.builder import substitute
 
 import arvados.collection
 import ruamel.yaml as yaml
 
 from .arvdocker import arv_docker_get_image
 from .pathmapper import ArvPathMapper
+from ._version import __version__
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -115,13 +118,23 @@ def upload_docker(arvrunner, tool):
 def upload_instance(arvrunner, name, tool, job_order):
         upload_docker(arvrunner, tool)
 
+        for t in tool.tool["inputs"]:
+            def setSecondary(fileobj):
+                if "__norecurse" in fileobj:
+                    del fileobj["__norecurse"]
+                    return
+                if "secondaryFiles" not in fileobj:
+                    fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File", "__norecurse": True} for sf in t["secondaryFiles"]]
+
+            if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+                adjustFileObjs(job_order, setSecondary)
+
         workflowmapper = upload_dependencies(arvrunner,
                                              name,
                                              tool.doc_loader,
                                              tool.tool,
                                              tool.tool["id"],
                                              True)
-
         jobmapper = upload_dependencies(arvrunner,
                                         os.path.basename(job_order.get("id", "#")),
                                         tool.doc_loader,
@@ -129,13 +142,18 @@ def upload_instance(arvrunner, name, tool, job_order):
                                         job_order.get("id", "#"),
                                         False)
 
-        adjustDirObjs(job_order, trim_listing)
-
         if "id" in job_order:
             del job_order["id"]
 
         return workflowmapper
 
+def arvados_jobs_image(arvrunner):
+    img = "arvados/jobs:"+__version__
+    try:
+        arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+    except Exception as e:
+        raise Exception("Docker image %s is not available\n%s" % (img, e) )
+    return img
 
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse, output_name):
@@ -153,7 +171,9 @@ class Runner(object):
 
     def arvados_job_spec(self, *args, **kwargs):
         self.name = os.path.basename(self.tool.tool["id"])
-        return upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+        workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
+        adjustDirObjs(self.job_order, trim_listing)
+        return workflowmapper
 
     def done(self, record):
         if record["state"] == "Complete":