14510: Setting collection cache wip
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index 5f4f8867a46d3373b2307f4266356427112b1114..f514476b9099895d3739dbb34ab0a0c372bd7e73 100644 (file)
@@ -14,7 +14,6 @@ from cwltool.load_tool import fetch_document
 from cwltool.process import shortname
 from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.builder import Builder
 from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
@@ -22,7 +21,7 @@ import ruamel.yaml as yaml
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
 from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool, check_cluster_target
+from .arvtool import ArvadosCommandTool, set_cluster_target, make_builder
 from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -133,26 +132,31 @@ class ArvadosWorkflowStep(WorkflowStep):
         self.arvrunner = arvrunner
 
     def job(self, joborder, output_callback, runtimeContext):
-        builder = self._init_job({shortname(k): v for k,v in joborder.items()}, runtimeContext)
-        check_cluster_target(self, builder, runtimeContext)
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.toplevel = True  # Preserve behavior for #13365
+
+        builder = make_builder({shortname(k): v for k,v in joborder.items()}, self.hints, self.requirements, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
         return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
 
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
     def __init__(self, arvrunner, toolpath_object, loadingContext):
-        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
         self.arvrunner = arvrunner
         self.wf_pdh = None
         self.dynamic_resource_req = []
         self.static_resource_req = []
         self.wf_reffiles = []
         self.loadingContext = loadingContext
+        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+        self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
 
     def job(self, joborder, output_callback, runtimeContext):
 
         builder = self._init_job(joborder, runtimeContext)
-        check_cluster_target(self, builder, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
         if not req: