from cwltool.workflow import Workflow, WorkflowException
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import Builder
+from cwltool.context import LoadingContext
import ruamel.yaml as yaml
upload_dependencies(arvRunner, name, tool.doc_loader,
packed, tool.tool["id"], False)
- # TODO nowhere for submit_runner_ram to go.
+ if submit_runner_ram:
+ hints = main.get("hints", [])
+ found = False
+ for h in hints:
+ if h["class"] == "http://arvados.org/cwl#WorkflowRunnerResources":
+ h["ramMin"] = submit_runner_ram
+ found = True
+ break
+ if not found:
+ hints.append({"class": "http://arvados.org/cwl#WorkflowRunnerResources",
+ "ramMin": submit_runner_ram})
+ main["hints"] = hints
body = {
"workflow": {
"name": name,
"description": tool.tool.get("doc", ""),
- "definition":yaml.round_trip_dump(packed)
+ "definition":json.dumps(packed, sort_keys=True, indent=4, separators=(',',': '))
}}
if project_uuid:
body["workflow"]["owner_uuid"] = project_uuid
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
- self.work_api = kwargs["work_api"]
self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
self.wf_reffiles = []
+ self.loadingContext = loadingContext
- def job(self, joborder, output_callback, **kwargs):
- kwargs["work_api"] = self.work_api
+ def job(self, joborder, output_callback, runtimeContext):
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if req:
with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
packed = pack(document_loader, workflowobj, uri, self.metadata)
- builder = Builder()
- builder.job = joborder
- builder.requirements = workflowobj["requirements"]
- builder.hints = workflowobj["hints"]
- builder.resources = {}
+ builder = Builder(joborder,
+ requirements=workflowobj["requirements"],
+ hints=workflowobj["hints"],
+ resources={})
def visit(item):
for t in ("hints", "requirements"):
self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
+ runtimeContext.name,
document_loader,
packed,
uri,
if self.dynamic_resource_req:
- builder = Builder()
- builder.job = joborder
- builder.requirements = self.requirements
- builder.hints = self.hints
- builder.resources = {}
+ builder = Builder(joborder,
+ requirements=self.requirements,
+ hints=self.hints,
+ resources={})
# Evaluate dynamic resource requirements using current builder
rs = copy.copy(self.static_resource_req)
reffiles = []
visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
- mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+ "/keep/%s",
+ "/keep/%s/%s")
# For containers API, we need to make sure any extra
# referenced files (ie referenced by the workflow but
# not in the inputs) are included in the mounts.
- kwargs["extra_reffiles"] = copy.deepcopy(self.wf_reffiles)
+ if self.wf_reffiles:
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
def keepmount(obj):
remove_redundant_fields(obj)
"outputs": self.tool["outputs"],
"stdout": "cwl.output.json",
"requirements": self.requirements+job_res_reqs+[
+ {"class": "InlineJavascriptRequirement"},
{
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": {
- "class": "File",
- "location": "keep:%s/workflow.cwl" % self.wf_pdh
- }
+ "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
}, {
"entryname": "cwl.input.yml",
"entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
"arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
"id": "#"
})
- kwargs["loader"] = self.doc_loader
- kwargs["avsc_names"] = self.doc_schema
- return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
+ return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
else:
- return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
+ return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)