10401: Add integration tests for directory listing behavior.
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index cd7e41a906b8fdacdea740a26d188a933348d9d9..4db1f4f2f4d8f48739bd7ef525e9526bdd2c3b4c 100644 (file)
@@ -13,7 +13,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import ruamel.yaml as yaml
 
-from .runner import upload_dependencies, trim_listing, packed_workflow
+from .runner import upload_dependencies, trim_listing, packed_workflow, upload_workflow_collection
 from .arvtool import ArvadosCommandTool
 from .perf import Perf
 
@@ -56,6 +56,13 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
         call = arvRunner.api.workflows().create(body=body)
     return call.execute(num_retries=arvRunner.num_retries)["uuid"]
 
+def dedup_reqs(reqs):
+    dedup = {}
+    for r in reversed(reqs):
+        if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
+            dedup[r["class"]] = r
+    return [dedup[r] for r in sorted(dedup.keys())]
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
@@ -63,6 +70,7 @@ class ArvadosWorkflow(Workflow):
         super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
         self.arvrunner = arvrunner
         self.work_api = kwargs["work_api"]
+        self.wf_pdh = None
 
     def job(self, joborder, output_callback, **kwargs):
         kwargs["work_api"] = self.work_api
@@ -74,17 +82,6 @@ class ArvadosWorkflow(Workflow):
             document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
 
             with Perf(metrics, "subworkflow upload_deps"):
-                workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
-                workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
-                packed = pack(document_loader, workflowobj, uri, self.metadata)
-
-                upload_dependencies(self.arvrunner,
-                                    kwargs.get("name", ""),
-                                    document_loader,
-                                    packed,
-                                    uri,
-                                    False)
-
                 upload_dependencies(self.arvrunner,
                                     os.path.basename(joborder.get("id", "#")),
                                     document_loader,
@@ -92,6 +89,19 @@ class ArvadosWorkflow(Workflow):
                                     joborder.get("id", "#"),
                                     False)
 
+                if self.wf_pdh is None:
+                    workflowobj["requirements"] = dedup_reqs(self.requirements)
+                    workflowobj["hints"] = dedup_reqs(self.hints)
+
+                    packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+                    upload_dependencies(self.arvrunner,
+                                        kwargs.get("name", ""),
+                                        document_loader,
+                                        packed,
+                                        uri,
+                                        False)
+
             with Perf(metrics, "subworkflow adjust"):
                 joborder_keepmount = copy.deepcopy(joborder)
 
@@ -111,8 +121,11 @@ class ArvadosWorkflow(Workflow):
 
                 adjustFileObjs(joborder_keepmount, keepmount)
                 adjustDirObjs(joborder_keepmount, keepmount)
-                adjustFileObjs(packed, keepmount)
-                adjustDirObjs(packed, keepmount)
+
+                if self.wf_pdh is None:
+                    adjustFileObjs(packed, keepmount)
+                    adjustDirObjs(packed, keepmount)
+                    self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
 
             wf_runner = cmap({
                 "class": "CommandLineTool",
@@ -125,10 +138,13 @@ class ArvadosWorkflow(Workflow):
                     "class": "InitialWorkDirRequirement",
                     "listing": [{
                             "entryname": "workflow.cwl",
-                            "entry": yaml.round_trip_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+                            "entry": {
+                                "class": "File",
+                                "location": "keep:%s/workflow.cwl" % self.wf_pdh
+                            }
                         }, {
                             "entryname": "cwl.input.yml",
-                            "entry": yaml.round_trip_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+                            "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
                         }]
                 }],
                 "hints": workflowobj["hints"],