14510: Setting collection cache wip
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index f86641bfd60bdeeb3e663036ee641d7e7c9499f9..f514476b9099895d3739dbb34ab0a0c372bd7e73 100644 (file)
@@ -12,9 +12,8 @@ from schema_salad.sourceline import SourceLine, cmap
 from cwltool.pack import pack
 from cwltool.load_tool import fetch_document
 from cwltool.process import shortname
-from cwltool.workflow import Workflow, WorkflowException
+from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.builder import Builder
 from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
@@ -22,7 +21,7 @@ import ruamel.yaml as yaml
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
                      trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
 from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, set_cluster_target, make_builder
 from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -118,25 +117,46 @@ def get_overall_res_req(res_reqs):
             overall_res_req["class"] = "ResourceRequirement"
         return cmap(overall_res_req)
 
+class ArvadosWorkflowStep(WorkflowStep):
+    def __init__(self,
+                 toolpath_object,      # type: Dict[Text, Any]
+                 pos,                  # type: int
+                 loadingContext,       # type: LoadingContext
+                 arvrunner,
+                 *argc,
+                 **argv
+                ):  # type: (...) -> None
+
+        super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+        self.tool["class"] = "WorkflowStep"
+        self.arvrunner = arvrunner
+
+    def job(self, joborder, output_callback, runtimeContext):
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.toplevel = True  # Preserve behavior for #13365
+
+        builder = make_builder({shortname(k): v for k,v in joborder.items()}, self.hints, self.requirements, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+        return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
     def __init__(self, arvrunner, toolpath_object, loadingContext):
-        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
         self.arvrunner = arvrunner
         self.wf_pdh = None
         self.dynamic_resource_req = []
         self.static_resource_req = []
         self.wf_reffiles = []
         self.loadingContext = loadingContext
+        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+        self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
 
     def job(self, joborder, output_callback, runtimeContext):
 
-        cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
-        if runtimeContext.cluster_target is None or runtimeContext.cluster_target.instance != id(cluster_target_req):
-            runtimeContext.cluster_target = ClusterTarget(id(cluster_target_req),
-                                                          builder.do_eval(cluster_target_req.get("clusterID")),
-                                                          builder.do_eval(cluster_target_req.get("ownerUUID")))
+        builder = self._init_job(joborder, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
         if not req:
@@ -165,11 +185,6 @@ class ArvadosWorkflow(Workflow):
 
                 packed = pack(document_loader, workflowobj, uri, self.metadata)
 
-                builder = Builder(joborder,
-                                  requirements=workflowobj["requirements"],
-                                  hints=workflowobj["hints"],
-                                  resources={})
-
                 def visit(item):
                     for t in ("hints", "requirements"):
                         if t not in item:
@@ -209,11 +224,6 @@ class ArvadosWorkflow(Workflow):
 
 
         if self.dynamic_resource_req:
-            builder = Builder(joborder,
-                              requirements=self.requirements,
-                              hints=self.hints,
-                              resources={})
-
             # Evaluate dynamic resource requirements using current builder
             rs = copy.copy(self.static_resource_req)
             for dyn_rs in self.dynamic_resource_req:
@@ -295,3 +305,13 @@ class ArvadosWorkflow(Workflow):
             "id": "#"
         })
         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+
+    def make_workflow_step(self,
+                           toolpath_object,      # type: Dict[Text, Any]
+                           pos,                  # type: int
+                           loadingContext,       # type: LoadingContext
+                           *argc,
+                           **argv
+    ):
+        # (...) -> WorkflowStep
+        return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)