import ruamel.yaml as yaml
-from .runner import upload_dependencies, trim_listing, packed_workflow
+from .runner import upload_dependencies, trim_listing, packed_workflow, upload_workflow_collection
from .arvtool import ArvadosCommandTool
from .perf import Perf
call = arvRunner.api.workflows().create(body=body)
return call.execute(num_retries=arvRunner.num_retries)["uuid"]
+def dedup_reqs(reqs):
+ dedup = {}
+ for r in reversed(reqs):
+ if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
+ dedup[r["class"]] = r
+ return [dedup[r] for r in sorted(dedup.keys())]
+
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
self.arvrunner = arvrunner
self.work_api = kwargs["work_api"]
+ self.wf_pdh = None
def job(self, joborder, output_callback, **kwargs):
kwargs["work_api"] = self.work_api
document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
with Perf(metrics, "subworkflow upload_deps"):
- workflowobj["requirements"] = self.requirements + workflowobj.get("requirements", [])
- workflowobj["hints"] = self.hints + workflowobj.get("hints", [])
- packed = pack(document_loader, workflowobj, uri, self.metadata)
-
- upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
- document_loader,
- packed,
- uri,
- False)
-
upload_dependencies(self.arvrunner,
os.path.basename(joborder.get("id", "#")),
document_loader,
joborder.get("id", "#"),
False)
+ if self.wf_pdh is None:
+ workflowobj["requirements"] = dedup_reqs(self.requirements)
+ workflowobj["hints"] = dedup_reqs(self.hints)
+
+ packed = pack(document_loader, workflowobj, uri, self.metadata)
+
+ upload_dependencies(self.arvrunner,
+ kwargs.get("name", ""),
+ document_loader,
+ packed,
+ uri,
+ False)
+
with Perf(metrics, "subworkflow adjust"):
joborder_keepmount = copy.deepcopy(joborder)
adjustFileObjs(joborder_keepmount, keepmount)
adjustDirObjs(joborder_keepmount, keepmount)
- adjustFileObjs(packed, keepmount)
- adjustDirObjs(packed, keepmount)
+
+ if self.wf_pdh is None:
+ adjustFileObjs(packed, keepmount)
+ adjustDirObjs(packed, keepmount)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
wf_runner = cmap({
"class": "CommandLineTool",
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": yaml.round_trip_dump(packed).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ "entry": {
+ "class": "File",
+ "location": "keep:%s/workflow.cwl" % self.wf_pdh
+ }
}, {
"entryname": "cwl.input.yml",
- "entry": yaml.round_trip_dump(joborder_keepmount).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
+ "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
}]
}],
"hints": workflowobj["hints"],