from cwltool.workflow import Workflow, WorkflowException
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import Builder
+from cwltool.context import LoadingContext
import ruamel.yaml as yaml
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
- self.work_api = kwargs["work_api"]
self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
+ self.wf_reffiles = []
+ self.loadingContext = loadingContext
- def job(self, joborder, output_callback, **kwargs):
- kwargs["work_api"] = self.work_api
+ def job(self, joborder, output_callback, runtimeContext):
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if req:
with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
packed = pack(document_loader, workflowobj, uri, self.metadata)
- builder = Builder()
- builder.job = joborder
- builder.requirements = workflowobj["requirements"]
- builder.hints = workflowobj["hints"]
- builder.resources = {}
+ builder = Builder(joborder,
+ requirements=workflowobj["requirements"],
+ hints=workflowobj["hints"],
+ resources={})
def visit(item):
for t in ("hints", "requirements"):
self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
+ runtimeContext.name,
document_loader,
packed,
uri,
False)
+ # Discover files/directories referenced by the
+ # workflow (mainly "default" values)
+ visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
+
+
if self.dynamic_resource_req:
- builder = Builder()
- builder.job = joborder
- builder.requirements = self.requirements
- builder.hints = self.hints
- builder.resources = {}
+ builder = Builder(joborder,
+ requirements=self.requirements,
+ hints=self.hints,
+ resources={})
# Evaluate dynamic resource requirements using current builder
rs = copy.copy(self.static_resource_req)
joborder_keepmount = copy.deepcopy(joborder)
reffiles = []
- visit_class(joborder_keepmount, ("File", "Directory"), lambda x: reffiles.append(x))
+ visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
+
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+ "/keep/%s",
+ "/keep/%s/%s")
- mapper = ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
+ # For containers API, we need to make sure any extra
+ # referenced files (ie referenced by the workflow but
+ # not in the inputs) are included in the mounts.
+ if self.wf_reffiles:
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
def keepmount(obj):
remove_redundant_fields(obj)
"outputs": self.tool["outputs"],
"stdout": "cwl.output.json",
"requirements": self.requirements+job_res_reqs+[
+ {"class": "InlineJavascriptRequirement"},
{
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": {
- "class": "File",
- "location": "keep:%s/workflow.cwl" % self.wf_pdh
- }
+ "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
}, {
"entryname": "cwl.input.yml",
"entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
"arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
"id": "#"
})
- kwargs["loader"] = self.doc_loader
- kwargs["avsc_names"] = self.doc_schema
- return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
+ return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
else:
- return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
+ return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)