from cwltool.pack import pack
from cwltool.load_tool import fetch_document
from cwltool.process import shortname
-from cwltool.workflow import Workflow, WorkflowException
+from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.builder import Builder
from cwltool.context import LoadingContext
import ruamel.yaml as yaml
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool, check_cluster_target
+from .arvtool import ArvadosCommandTool, set_cluster_target, make_builder
from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
overall_res_req["class"] = "ResourceRequirement"
return cmap(overall_res_req)
+class ArvadosWorkflowStep(WorkflowStep):
+ def __init__(self,
+ toolpath_object, # type: Dict[Text, Any]
+ pos, # type: int
+ loadingContext, # type: LoadingContext
+ arvrunner,
+ *argc,
+ **argv
+ ): # type: (...) -> None
+
+ super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+ self.tool["class"] = "WorkflowStep"
+ self.arvrunner = arvrunner
+
+ def job(self, joborder, output_callback, runtimeContext):
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.toplevel = True # Preserve behavior for #13365
+
+ builder = make_builder({shortname(k): v for k,v in joborder.items()}, self.hints, self.requirements, runtimeContext)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+ return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
+
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
def __init__(self, arvrunner, toolpath_object, loadingContext):
- super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
self.wf_reffiles = []
self.loadingContext = loadingContext
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+ self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
def job(self, joborder, output_callback, runtimeContext):
- check_cluster_target(self, self._init_job(joborder, runtimeContext), runtimeContext)
+ builder = self._init_job(joborder, runtimeContext)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if not req:
packed = pack(document_loader, workflowobj, uri, self.metadata)
- builder = Builder(joborder,
- requirements=workflowobj["requirements"],
- hints=workflowobj["hints"],
- resources={})
-
def visit(item):
for t in ("hints", "requirements"):
if t not in item:
if self.dynamic_resource_req:
- builder = Builder(joborder,
- requirements=self.requirements,
- hints=self.hints,
- resources={})
-
# Evaluate dynamic resource requirements using current builder
rs = copy.copy(self.static_resource_req)
for dyn_rs in self.dynamic_resource_req:
"id": "#"
})
return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+
+ def make_workflow_step(self,
+ toolpath_object, # type: Dict[Text, Any]
+ pos, # type: int
+ loadingContext, # type: LoadingContext
+ *argc,
+ **argv
+ ):
+ # (...) -> WorkflowStep
+ return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)