Merge branch '15655-logtail-encoding'
[arvados.git] / sdk / cwl / arvados_cwl / arvworkflow.py
index 2f114f4ffb1c72176aab8e43d0d93659a71feb9f..604ad39de78877b96121fae730eb12ea0da080d6 100644 (file)
@@ -2,6 +2,9 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
+from past.builtins import basestring
+from future.utils import viewitems
+
 import os
 import json
 import copy
@@ -12,17 +15,18 @@ from schema_salad.sourceline import SourceLine, cmap
 from cwltool.pack import pack
 from cwltool.load_tool import fetch_document
 from cwltool.process import shortname
-from cwltool.workflow import Workflow, WorkflowException
+from cwltool.workflow import Workflow, WorkflowException, WorkflowStep
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.builder import Builder
 from cwltool.context import LoadingContext
 
 import ruamel.yaml as yaml
 
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
-                     trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
+                     trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
+                     make_builder)
 from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool
+from .arvtool import ArvadosCommandTool, set_cluster_target
+
 from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -118,25 +122,46 @@ def get_overall_res_req(res_reqs):
             overall_res_req["class"] = "ResourceRequirement"
         return cmap(overall_res_req)
 
+class ArvadosWorkflowStep(WorkflowStep):
+    def __init__(self,
+                 toolpath_object,      # type: Dict[Text, Any]
+                 pos,                  # type: int
+                 loadingContext,       # type: LoadingContext
+                 arvrunner,
+                 *argc,
+                 **argv
+                ):  # type: (...) -> None
+
+        super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+        self.tool["class"] = "WorkflowStep"
+        self.arvrunner = arvrunner
+
+    def job(self, joborder, output_callback, runtimeContext):
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.toplevel = True  # Preserve behavior for #13365
+
+        builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+        return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
     def __init__(self, arvrunner, toolpath_object, loadingContext):
-        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
         self.arvrunner = arvrunner
         self.wf_pdh = None
         self.dynamic_resource_req = []
         self.static_resource_req = []
         self.wf_reffiles = []
         self.loadingContext = loadingContext
+        super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
+        self.cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
 
     def job(self, joborder, output_callback, runtimeContext):
 
-        cluster_target_req, _ = self.get_requirement("http://arvados.org/cwl#ClusterTarget")
-        if cluster_target_req and runtimeContext.cluster_target_id != id(cluster_target_req):
-            runtimeContext.cluster_target_id = id(cluster_target_req)
-            runtimeContext.submit_runner_cluster = builder.do_eval(cluster_target_req.get("clusterID")) or runtimeContext.submit_runner_cluster
-            runtimeContext.project_uuid = builder.do_eval(cluster_target_req.get("ownerUUID")) or runtimeContext.project_uuid
+        builder = make_builder(joborder, self.hints, self.requirements, runtimeContext)
+        runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
 
         req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
         if not req:
@@ -149,7 +174,8 @@ class ArvadosWorkflow(Workflow):
                 raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
         document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
 
-        discover_secondary_files(self.tool["inputs"], joborder)
+        discover_secondary_files(self.arvrunner.fs_access, builder,
+                                 self.tool["inputs"], joborder)
 
         with Perf(metrics, "subworkflow upload_deps"):
             upload_dependencies(self.arvrunner,
@@ -165,11 +191,6 @@ class ArvadosWorkflow(Workflow):
 
                 packed = pack(document_loader, workflowobj, uri, self.metadata)
 
-                builder = Builder(joborder,
-                                  requirements=workflowobj["requirements"],
-                                  hints=workflowobj["hints"],
-                                  resources={})
-
                 def visit(item):
                     for t in ("hints", "requirements"):
                         if t not in item:
@@ -190,6 +211,9 @@ class ArvadosWorkflow(Workflow):
                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
                                 if not dyn:
                                     self.static_resource_req.append(req)
+                            if req["class"] == "DockerRequirement":
+                                if "http://arvados.org/cwl#dockerCollectionPDH" in req:
+                                    del req["http://arvados.org/cwl#dockerCollectionPDH"]
 
                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
 
@@ -209,11 +233,6 @@ class ArvadosWorkflow(Workflow):
 
 
         if self.dynamic_resource_req:
-            builder = Builder(joborder,
-                              requirements=self.requirements,
-                              hints=self.hints,
-                              resources={})
-
             # Evaluate dynamic resource requirements using current builder
             rs = copy.copy(self.static_resource_req)
             for dyn_rs in self.dynamic_resource_req:
@@ -272,6 +291,16 @@ class ArvadosWorkflow(Workflow):
                 adjustDirObjs(packed, keepmount)
                 self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
 
+        self.loadingContext = self.loadingContext.copy()
+        self.loadingContext.metadata = self.loadingContext.metadata.copy()
+        self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
+
+        if len(job_res_reqs) == 1:
+            # RAM request needs to be at least 128 MiB or the workflow
+            # runner itself won't run reliably.
+            if job_res_reqs[0].get("ramMin", 1024) < 128:
+                job_res_reqs[0]["ramMin"] = 128
+
         wf_runner = cmap({
             "class": "CommandLineTool",
             "baseCommand": "cwltool",
@@ -295,3 +324,13 @@ class ArvadosWorkflow(Workflow):
             "id": "#"
         })
         return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
+
+    def make_workflow_step(self,
+                           toolpath_object,      # type: Dict[Text, Any]
+                           pos,                  # type: int
+                           loadingContext,       # type: LoadingContext
+                           *argc,
+                           **argv
+    ):
+        # (...) -> WorkflowStep
+        return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)